From 260fae296bfff8b025282ba8d2a112b2ad25bc50 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 5 Jun 2023 15:05:21 -0300 Subject: [PATCH] feat(gcp_pubsub): generate jwt tokens on demand without workers (5.1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes https://emqx.atlassian.net/browse/EMQX-9603 Rather than relying on a JWT worker to produce and refresh tokens, we could just produce then on demand when pushing the messages to GCP PubSub. That can generate a bit of extra work (as multiple processes might realize it’s time to refresh the JWT and do so), but that shouldn’t be much. In return, we avoid any possibility of not having a fresh JWT when pushing messages. --- .../src/emqx_bridge_gcp_pubsub_connector.erl | 107 ++++++-------- .../test/emqx_bridge_gcp_pubsub_SUITE.erl | 132 ++++++++++-------- .../emqx_connector/src/emqx_connector_jwt.erl | 87 +++++++++++- .../src/emqx_connector_jwt_worker.erl | 39 +----- .../test/emqx_connector_jwt_SUITE.erl | 66 +++++++++ .../test/emqx_connector_jwt_worker_SUITE.erl | 4 +- changes/ee/feat-10944.en.md | 1 + 7 files changed, 273 insertions(+), 163 deletions(-) create mode 100644 changes/ee/feat-10944.en.md diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index d2f6bbee4..6d278c65f 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -6,6 +6,7 @@ -behaviour(emqx_resource). +-include_lib("jose/include/jose_jwk.hrl"). -include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("typerefl/include/types.hrl"). @@ -26,7 +27,6 @@ ]). -export([reply_delegator/3]). --type jwt_worker() :: binary(). -type service_account_json() :: emqx_bridge_gcp_pubsub:service_account_json(). -type config() :: #{ connect_timeout := emqx_schema:duration_ms(), @@ -38,7 +38,7 @@ }. -type state() :: #{ connect_timeout := timer:time(), - jwt_worker_id := jwt_worker(), + jwt_config := emqx_connector_jwt:jwt_config(), max_retries := non_neg_integer(), payload_template := emqx_plugin_libs_rule:tmpl_token(), pool_name := binary(), @@ -97,12 +97,12 @@ on_start( {enable_pipelining, maps:get(enable_pipelining, Config, ?DEFAULT_PIPELINE_SIZE)} ], #{ - jwt_worker_id := JWTWorkerId, + jwt_config := JWTConfig, project_id := ProjectId - } = ensure_jwt_worker(ResourceId, Config), + } = parse_jwt_config(ResourceId, Config), State = #{ connect_timeout => ConnectTimeout, - jwt_worker_id => JWTWorkerId, + jwt_config => JWTConfig, max_retries => MaxRetries, payload_template => emqx_plugin_libs_rule:preproc_tmpl(PayloadTemplate), pool_name => ResourceId, @@ -136,14 +136,13 @@ on_start( -spec on_stop(resource_id(), state()) -> ok | {error, term()}. on_stop( ResourceId, - _State = #{jwt_worker_id := JWTWorkerId} + _State = #{jwt_config := JWTConfig} ) -> - ?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_worker_id => JWTWorkerId}), + ?tp(gcp_pubsub_stop, #{resource_id => ResourceId, jwt_config => JWTConfig}), ?SLOG(info, #{ msg => "stopping_gcp_pubsub_bridge", connector => ResourceId }), - emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), emqx_connector_jwt:delete_jwt(?JWT_TABLE, ResourceId), ehttpc_sup:stop_pool(ResourceId). @@ -228,12 +227,12 @@ on_get_status(ResourceId, #{connect_timeout := Timeout} = State) -> %% Helper fns %%------------------------------------------------------------------------------------------------- --spec ensure_jwt_worker(resource_id(), config()) -> +-spec parse_jwt_config(resource_id(), config()) -> #{ - jwt_worker_id := jwt_worker(), + jwt_config := emqx_connector_jwt:jwt_config(), project_id := binary() }. -ensure_jwt_worker(ResourceId, #{ +parse_jwt_config(ResourceId, #{ service_account_json := ServiceAccountJSON }) -> #{ @@ -246,8 +245,32 @@ ensure_jwt_worker(ResourceId, #{ Aud = <<"https://pubsub.googleapis.com/">>, ExpirationMS = timer:hours(1), Alg = <<"RS256">>, - Config = #{ - private_key => PrivateKeyPEM, + JWK = + try jose_jwk:from_pem(PrivateKeyPEM) of + JWK0 = #jose_jwk{} -> + %% Don't wrap the JWK with `emqx_secret:wrap' here; + %% this is stored in mnesia and synchronized among the + %% nodes, and will easily become a bad fun. + JWK0; + [] -> + ?tp(error, gcp_pubsub_connector_startup_error, #{error => empty_key}), + throw("empty private in service account json"); + {error, Reason} -> + Error = {invalid_private_key, Reason}, + ?tp(error, gcp_pubsub_connector_startup_error, #{error => Error}), + throw("invalid private key in service account json"); + Error0 -> + Error = {invalid_private_key, Error0}, + ?tp(error, gcp_pubsub_connector_startup_error, #{error => Error}), + throw("invalid private key in service account json") + catch + Kind:Reason -> + Error = {Kind, Reason}, + ?tp(error, gcp_pubsub_connector_startup_error, #{error => Error}), + throw("invalid private key in service account json") + end, + JWTConfig = #{ + jwk => emqx_secret:wrap(JWK), resource_id => ResourceId, expiration => ExpirationMS, table => ?JWT_TABLE, @@ -257,46 +280,8 @@ ensure_jwt_worker(ResourceId, #{ kid => KId, alg => Alg }, - - JWTWorkerId = <<"gcp_pubsub_jwt_worker:", ResourceId/binary>>, - Worker = - case emqx_connector_jwt_sup:ensure_worker_present(JWTWorkerId, Config) of - {ok, Worker0} -> - Worker0; - Error -> - ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => ResourceId, - reason => Error - }), - _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), - throw(failed_to_start_jwt_worker) - end, - MRef = monitor(process, Worker), - Ref = emqx_connector_jwt_worker:ensure_jwt(Worker), - - %% to ensure that this resource and its actions will be ready to - %% serve when started, we must ensure that the first JWT has been - %% produced by the worker. - receive - {Ref, token_created} -> - ?tp(gcp_pubsub_bridge_jwt_created, #{resource_id => ResourceId}), - demonitor(MRef, [flush]), - ok; - {'DOWN', MRef, process, Worker, Reason} -> - ?tp(error, "gcp_pubsub_bridge_jwt_worker_failed_to_start", #{ - connector => ResourceId, - reason => Reason - }), - _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), - throw(failed_to_start_jwt_worker) - after 10_000 -> - ?tp(warning, "gcp_pubsub_bridge_jwt_timeout", #{connector => ResourceId}), - demonitor(MRef, [flush]), - _ = emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), - throw(timeout_creating_jwt) - end, #{ - jwt_worker_id => JWTWorkerId, + jwt_config => JWTConfig, project_id => ProjectId }. @@ -322,14 +307,10 @@ publish_path( ) -> <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. --spec get_jwt_authorization_header(resource_id()) -> [{binary(), binary()}]. -get_jwt_authorization_header(ResourceId) -> - case emqx_connector_jwt:lookup_jwt(?JWT_TABLE, ResourceId) of - %% Since we synchronize the JWT creation during resource start - %% (see `on_start/2'), this will be always be populated. - {ok, JWT} -> - [{<<"Authorization">>, <<"Bearer ", JWT/binary>>}] - end. +-spec get_jwt_authorization_header(emqx_connector_jwt:jwt_config()) -> [{binary(), binary()}]. +get_jwt_authorization_header(JWTConfig) -> + JWT = emqx_connector_jwt:ensure_jwt(JWTConfig), + [{<<"Authorization">>, <<"Bearer ", JWT/binary>>}]. -spec do_send_requests_sync( state(), @@ -342,6 +323,7 @@ get_jwt_authorization_header(ResourceId) -> | {error, term()}. do_send_requests_sync(State, Requests, ResourceId) -> #{ + jwt_config := JWTConfig, pool_name := PoolName, max_retries := MaxRetries, request_ttl := RequestTTL @@ -354,7 +336,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> requests => Requests } ), - Headers = get_jwt_authorization_header(ResourceId), + Headers = get_jwt_authorization_header(JWTConfig), Payloads = lists:map( fun({send_message, Selected}) -> @@ -466,6 +448,7 @@ do_send_requests_sync(State, Requests, ResourceId) -> ) -> {ok, pid()}. do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> #{ + jwt_config := JWTConfig, pool_name := PoolName, request_ttl := RequestTTL } = State, @@ -477,7 +460,7 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs, ResourceId) -> requests => Requests } ), - Headers = get_jwt_authorization_header(ResourceId), + Headers = get_jwt_authorization_header(JWTConfig), Payloads = lists:map( fun({send_message, Selected}) -> diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl index b5f22f727..4dd2682fb 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_SUITE.erl @@ -55,8 +55,9 @@ single_config_tests() -> t_not_of_service_account_type, t_json_missing_fields, t_invalid_private_key, - t_jwt_worker_start_timeout, - t_failed_to_start_jwt_worker, + t_truncated_private_key, + t_jose_error_tuple, + t_jose_other_error, t_stop, t_get_status_ok, t_get_status_down, @@ -580,14 +581,7 @@ t_publish_success(Config) -> ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, - ?check_trace( - create_bridge(Config), - fun(Res, Trace) -> - ?assertMatch({ok, _}, Res), - ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), - ok - end - ), + ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), assert_empty_metrics(ResourceId), @@ -686,14 +680,7 @@ t_publish_success_local_topic(Config) -> ok. t_create_via_http(Config) -> - ?check_trace( - create_bridge_http(Config), - fun(Res, Trace) -> - ?assertMatch({ok, _}, Res), - ?assertMatch([_, _], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), - ok - end - ), + ?assertMatch({ok, _}, create_bridge_http(Config)), ok. t_publish_templated(Config) -> @@ -705,16 +692,12 @@ t_publish_templated(Config) -> "{\"payload\": \"${payload}\"," " \"pub_props\": ${pub_props}}" >>, - ?check_trace( + ?assertMatch( + {ok, _}, create_bridge( Config, #{<<"payload_template">> => PayloadTemplate} - ), - fun(Res, Trace) -> - ?assertMatch({ok, _}, Res), - ?assertMatch([_], ?of_kind(gcp_pubsub_bridge_jwt_created, Trace)), - ok - end + ) ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), @@ -908,36 +891,26 @@ t_invalid_private_key(Config) -> #{<<"private_key">> => InvalidPrivateKeyPEM} } ), - #{?snk_kind := "gcp_pubsub_bridge_jwt_worker_failed_to_start"}, + #{?snk_kind := gcp_pubsub_connector_startup_error}, 20_000 ), Res end, fun(Res, Trace) -> ?assertMatch({ok, _}, Res), - ?assertMatch( - [#{reason := Reason}] when - Reason =:= noproc orelse - Reason =:= {shutdown, {error, empty_key}}, - ?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace) - ), ?assertMatch( [#{error := empty_key}], - ?of_kind(connector_jwt_worker_startup_error, Trace) + ?of_kind(gcp_pubsub_connector_startup_error, Trace) ), ok end ), ok. -t_jwt_worker_start_timeout(Config) -> - InvalidPrivateKeyPEM = <<"xxxxxx">>, +t_truncated_private_key(Config) -> + InvalidPrivateKeyPEM = <<"-----BEGIN PRIVATE KEY-----\nMIIEvQI...">>, ?check_trace( begin - ?force_ordering( - #{?snk_kind := will_never_happen}, - #{?snk_kind := connector_jwt_worker_make_key} - ), {Res, {ok, _Event}} = ?wait_async_action( create_bridge( @@ -947,14 +920,71 @@ t_jwt_worker_start_timeout(Config) -> #{<<"private_key">> => InvalidPrivateKeyPEM} } ), - #{?snk_kind := "gcp_pubsub_bridge_jwt_timeout"}, + #{?snk_kind := gcp_pubsub_connector_startup_error}, 20_000 ), Res end, fun(Res, Trace) -> ?assertMatch({ok, _}, Res), - ?assertMatch([_], ?of_kind("gcp_pubsub_bridge_jwt_timeout", Trace)), + ?assertMatch( + [#{error := {error, function_clause}}], + ?of_kind(gcp_pubsub_connector_startup_error, Trace) + ), + ok + end + ), + ok. + +t_jose_error_tuple(Config) -> + ?check_trace( + begin + {Res, {ok, _Event}} = + ?wait_async_action( + emqx_common_test_helpers:with_mock( + jose_jwk, + from_pem, + fun(_PrivateKeyPEM) -> {error, some_error} end, + fun() -> create_bridge(Config) end + ), + #{?snk_kind := gcp_pubsub_connector_startup_error}, + 20_000 + ), + Res + end, + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch( + [#{error := {invalid_private_key, some_error}}], + ?of_kind(gcp_pubsub_connector_startup_error, Trace) + ), + ok + end + ), + ok. + +t_jose_other_error(Config) -> + ?check_trace( + begin + {Res, {ok, _Event}} = + ?wait_async_action( + emqx_common_test_helpers:with_mock( + jose_jwk, + from_pem, + fun(_PrivateKeyPEM) -> {unknown, error} end, + fun() -> create_bridge(Config) end + ), + #{?snk_kind := gcp_pubsub_connector_startup_error}, + 20_000 + ), + Res + end, + fun(Res, Trace) -> + ?assertMatch({ok, _}, Res), + ?assertMatch( + [#{error := {invalid_private_key, {unknown, error}}}], + ?of_kind(gcp_pubsub_connector_startup_error, Trace) + ), ok end ), @@ -1309,26 +1339,6 @@ t_unrecoverable_error(Config) -> ), ok. -t_failed_to_start_jwt_worker(Config) -> - ?check_trace( - emqx_common_test_helpers:with_mock( - emqx_connector_jwt_sup, - ensure_worker_present, - fun(_JWTWorkerId, _Config) -> {error, restarting} end, - fun() -> - ?assertMatch({ok, _}, create_bridge(Config)) - end - ), - fun(Trace) -> - ?assertMatch( - [#{reason := {error, restarting}}], - ?of_kind("gcp_pubsub_bridge_jwt_worker_failed_to_start", Trace) - ), - ok - end - ), - ok. - t_stop(Config) -> Name = ?config(gcp_pubsub_name, Config), {ok, _} = create_bridge(Config), diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index 1e695faad..49b7b3a0e 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -19,15 +19,33 @@ -include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). %% API -export([ lookup_jwt/1, lookup_jwt/2, - delete_jwt/2 + delete_jwt/2, + ensure_jwt/1 ]). -type jwt() :: binary(). +-type wrapped_jwk() :: fun(() -> jose_jwk:key()). +-type jwk() :: jose_jwk:key(). +-type jwt_config() :: #{ + expiration := timer:time(), + resource_id := resource_id(), + table := ets:table(), + jwk := wrapped_jwk() | jwk(), + iss := binary(), + sub := binary(), + aud := binary(), + kid := binary(), + alg := binary() +}. + +-export_type([jwt_config/0, jwt/0]). -spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}. lookup_jwt(ResourceId) -> @@ -57,3 +75,70 @@ delete_jwt(TId, ResourceId) -> error:badarg -> ok end. + +%% @doc Attempts to retrieve a valid JWT from the cache. If there is +%% none or if the cached token is expired, generates an caches a fresh +%% one. +-spec ensure_jwt(jwt_config()) -> jwt(). +ensure_jwt(JWTConfig) -> + #{resource_id := ResourceId, table := Table} = JWTConfig, + case lookup_jwt(Table, ResourceId) of + {error, not_found} -> + JWT = do_generate_jwt(JWTConfig), + store_jwt(JWTConfig, JWT), + JWT; + {ok, JWT0} -> + case is_about_to_expire(JWT0) of + true -> + JWT = do_generate_jwt(JWTConfig), + store_jwt(JWTConfig, JWT), + JWT; + false -> + JWT0 + end + end. + +%%----------------------------------------------------------------------------------------- +%% Helper fns +%%----------------------------------------------------------------------------------------- + +-spec do_generate_jwt(jwt_config()) -> jwt(). +do_generate_jwt(#{ + expiration := ExpirationMS, + iss := Iss, + sub := Sub, + aud := Aud, + kid := KId, + alg := Alg, + jwk := WrappedJWK +}) -> + JWK = emqx_secret:unwrap(WrappedJWK), + Headers = #{ + <<"alg">> => Alg, + <<"kid">> => KId + }, + Now = erlang:system_time(seconds), + ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second), + Claims = #{ + <<"iss">> => Iss, + <<"sub">> => Sub, + <<"aud">> => Aud, + <<"iat">> => Now, + <<"exp">> => Now + ExpirationS + }, + JWT0 = jose_jwt:sign(JWK, Headers, Claims), + {_, JWT} = jose_jws:compact(JWT0), + JWT. + +-spec store_jwt(jwt_config(), jwt()) -> ok. +store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> + true = ets:insert(TId, {{ResourceId, jwt}, JWT}), + ?tp(emqx_connector_jwt_token_stored, #{resource_id => ResourceId}), + ok. + +-spec is_about_to_expire(jwt()) -> boolean(). +is_about_to_expire(JWT) -> + #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), + Now = erlang:system_time(seconds), + GraceExp = Exp - timer:seconds(5), + Now >= GraceExp. diff --git a/apps/emqx_connector/src/emqx_connector_jwt_worker.erl b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl index b13e74a4d..c814b32e1 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt_worker.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl @@ -189,49 +189,14 @@ terminate(_Reason, State) -> %% Helper fns %%----------------------------------------------------------------------------------------- --spec do_generate_jwt(state()) -> jwt(). -do_generate_jwt( - #{ - expiration := ExpirationMS, - iss := Iss, - sub := Sub, - aud := Aud, - kid := KId, - alg := Alg, - jwk := JWK - } = _State -) -> - Headers = #{ - <<"alg">> => Alg, - <<"kid">> => KId - }, - Now = erlang:system_time(seconds), - ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second), - Claims = #{ - <<"iss">> => Iss, - <<"sub">> => Sub, - <<"aud">> => Aud, - <<"iat">> => Now, - <<"exp">> => Now + ExpirationS - }, - JWT0 = jose_jwt:sign(JWK, Headers, Claims), - {_, JWT} = jose_jws:compact(JWT0), - JWT. - -spec generate_and_store_jwt(state()) -> state(). generate_and_store_jwt(State0) -> - JWT = do_generate_jwt(State0), - store_jwt(State0, JWT), + JWTConfig = maps:without([jwt, refresh_timer], State0), + JWT = emqx_connector_jwt:ensure_jwt(JWTConfig), ?tp(connector_jwt_worker_refresh, #{jwt => JWT}), State1 = State0#{jwt := JWT}, ensure_timer(State1). --spec store_jwt(state(), jwt()) -> ok. -store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> - true = ets:insert(TId, {{ResourceId, jwt}, JWT}), - ?tp(connector_jwt_worker_token_stored, #{resource_id => ResourceId}), - ok. - -spec ensure_timer(state()) -> state(). ensure_timer( State = #{ diff --git a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl index 3105c95ea..a89d67f4f 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl @@ -18,7 +18,10 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("jose/include/jose_jwt.hrl"). +-include_lib("jose/include/jose_jws.hrl"). -include("emqx_connector_tables.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -compile([export_all, nowarn_export_all]). @@ -51,6 +54,33 @@ end_per_testcase(_TestCase, _Config) -> insert_jwt(TId, ResourceId, JWT) -> ets:insert(TId, {{ResourceId, jwt}, JWT}). +generate_private_key_pem() -> + PublicExponent = 65537, + Size = 2048, + Key = public_key:generate_key({rsa, Size, PublicExponent}), + DERKey = public_key:der_encode('PrivateKeyInfo', Key), + public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]). + +generate_config() -> + PrivateKeyPEM = generate_private_key_pem(), + ResourceID = emqx_guid:gen(), + #{ + private_key => PrivateKeyPEM, + expiration => timer:hours(1), + resource_id => ResourceID, + table => ets:new(test_jwt_table, [ordered_set, public]), + iss => <<"issuer">>, + sub => <<"subject">>, + aud => <<"audience">>, + kid => <<"key id">>, + alg => <<"RS256">> + }. + +is_expired(JWT) -> + #jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT), + Now = erlang:system_time(seconds), + Now >= Exp. + %%----------------------------------------------------------------------------- %% Test cases %%----------------------------------------------------------------------------- @@ -77,3 +107,39 @@ t_delete_jwt(_Config) -> ?assertEqual(ok, emqx_connector_jwt:delete_jwt(TId, ResourceId)), ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)), ok. + +t_ensure_jwt(_Config) -> + Config0 = + #{ + table := Table, + resource_id := ResourceId, + private_key := PrivateKeyPEM + } = generate_config(), + JWK = jose_jwk:from_pem(PrivateKeyPEM), + Config1 = maps:without([private_key], Config0), + Expiration = timer:seconds(10), + JWTConfig = Config1#{jwk => JWK, expiration := Expiration}, + ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(Table, ResourceId)), + ?check_trace( + begin + JWT0 = emqx_connector_jwt:ensure_jwt(JWTConfig), + ?assertNot(is_expired(JWT0)), + %% should refresh 5 s before expiration + ct:sleep(Expiration - 5500), + JWT1 = emqx_connector_jwt:ensure_jwt(JWTConfig), + ?assertNot(is_expired(JWT1)), + %% fully expired + ct:sleep(2 * Expiration), + JWT2 = emqx_connector_jwt:ensure_jwt(JWTConfig), + ?assertNot(is_expired(JWT2)), + {JWT0, JWT1, JWT2} + end, + fun({JWT0, JWT1, JWT2}, Trace) -> + ?assertNotEqual(JWT0, JWT1), + ?assertNotEqual(JWT1, JWT2), + ?assertNotEqual(JWT2, JWT0), + ?assertMatch([_, _, _], ?of_kind(emqx_connector_jwt_token_stored, Trace)), + ok + end + ), + ok. diff --git a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl index a079d632f..fdeb980d4 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl @@ -176,7 +176,7 @@ t_refresh(_Config) -> {{ok, _Pid}, {ok, _Event}} = ?wait_async_action( emqx_connector_jwt_worker:start_link(Config), - #{?snk_kind := connector_jwt_worker_token_stored}, + #{?snk_kind := emqx_connector_jwt_token_stored}, 5_000 ), {ok, FirstJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId), @@ -209,7 +209,7 @@ t_refresh(_Config) -> fun({FirstJWT, SecondJWT, ThirdJWT}, Trace) -> ?assertMatch( [_, _, _ | _], - ?of_kind(connector_jwt_worker_token_stored, Trace) + ?of_kind(emqx_connector_jwt_token_stored, Trace) ), ?assertNotEqual(FirstJWT, SecondJWT), ?assertNotEqual(SecondJWT, ThirdJWT), diff --git a/changes/ee/feat-10944.en.md b/changes/ee/feat-10944.en.md new file mode 100644 index 000000000..9a6c0b624 --- /dev/null +++ b/changes/ee/feat-10944.en.md @@ -0,0 +1 @@ +Improved the GCP PubSub bridge to avoid a potential issue where messages could fail to be sent when restarting a node.