From 446a4c74d0e4b2734f0603879d7e878fcef8f23c Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 10:46:19 -0300 Subject: [PATCH] fix(gcp_pubsub): fix potential jwt accumulation and lack of refresh (v5.0) https://emqx.atlassian.net/browse/EMQX-8653 Related: - https://emqx.atlassian.net/browse/EEC-737 - https://emqx.atlassian.net/browse/EMQX-8652 Since the rule resource testing mechanism creates a new resource to test the configuration, a new JWT associated with an unique temporary resource was being created and left in the JWT table, leaking it. Also, a wrong case clause when setting the new refresh timer for the JWT worker was preventing it from refreshing from the 2nd refresh onward. --- .../emqx_connector/src/emqx_connector.app.src | 2 +- .../emqx_connector/src/emqx_connector_jwt.erl | 15 +++++++- .../src/emqx_connector_jwt_sup.erl | 2 +- .../src/emqx_connector_jwt_worker.erl | 35 ++++++++++++++----- .../test/emqx_connector_jwt_SUITE.erl | 10 ++++++ .../test/emqx_connector_jwt_worker_SUITE.erl | 34 ++++++++++++++---- changes/v5.0.14-en.md | 8 +++++ changes/v5.0.14-zh.md | 8 +++++ .../test/emqx_ee_bridge_gcp_pubsub_SUITE.erl | 1 + .../src/emqx_ee_connector.app.src | 2 +- .../src/emqx_ee_connector_gcp_pubsub.erl | 1 + 11 files changed, 100 insertions(+), 18 deletions(-) create mode 100644 changes/v5.0.14-en.md create mode 100644 changes/v5.0.14-zh.md diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index e73b43751..2a379dbe4 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "An OTP application"}, - {vsn, "0.1.10"}, + {vsn, "0.1.11"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/emqx_connector_jwt.erl b/apps/emqx_connector/src/emqx_connector_jwt.erl index c5cd54cb9..e70326d61 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt.erl @@ -18,11 +18,13 @@ -include_lib("emqx_connector/include/emqx_connector_tables.hrl"). -include_lib("emqx_resource/include/emqx_resource.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). %% API -export([ lookup_jwt/1, - lookup_jwt/2 + lookup_jwt/2, + delete_jwt/2 ]). -type jwt() :: binary(). @@ -44,3 +46,14 @@ lookup_jwt(TId, ResourceId) -> error:badarg -> {error, not_found} end. + +-spec delete_jwt(ets:table(), resource_id()) -> ok. +delete_jwt(TId, ResourceId) -> + try + ets:delete(TId, {ResourceId, jwt}), + ?tp(connector_jwt_deleted, #{}), + ok + catch + error:badarg -> + ok + end. diff --git a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl index ac1d22b71..d90c0b2e2 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt_sup.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt_sup.erl @@ -78,7 +78,7 @@ jwt_worker_child_spec(Id, Config) -> restart => transient, type => worker, significant => false, - shutdown => brutal_kill, + shutdown => 5_000, modules => [emqx_connector_jwt_worker] }. diff --git a/apps/emqx_connector/src/emqx_connector_jwt_worker.erl b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl index cb975ca63..b07bdfe6a 100644 --- a/apps/emqx_connector/src/emqx_connector_jwt_worker.erl +++ b/apps/emqx_connector/src/emqx_connector_jwt_worker.erl @@ -21,7 +21,8 @@ %% API -export([ start_link/1, - ensure_jwt/1 + ensure_jwt/1, + force_refresh/1 ]). %% gen_server API @@ -32,7 +33,8 @@ handle_cast/2, handle_info/2, format_status/1, - format_status/2 + format_status/2, + terminate/2 ]). -include_lib("emqx_resource/include/emqx_resource.hrl"). @@ -52,7 +54,7 @@ }. -type jwt() :: binary(). -type state() :: #{ - refresh_timer := undefined | timer:tref(), + refresh_timer := undefined | timer:tref() | reference(), resource_id := resource_id(), expiration := timer:time(), table := ets:table(), @@ -94,6 +96,11 @@ ensure_jwt(Worker) -> gen_server:cast(Worker, {ensure_jwt, Ref}), Ref. +-spec force_refresh(pid()) -> ok. +force_refresh(Worker) -> + _ = erlang:send(Worker, {timeout, force_refresh, ?refresh_jwt}), + ok. + %%----------------------------------------------------------------------------------------- %% gen_server API %%----------------------------------------------------------------------------------------- @@ -102,6 +109,7 @@ ensure_jwt(Worker) -> {ok, state(), {continue, {make_key, binary()}}} | {stop, {error, term()}}. init(#{private_key := PrivateKeyPEM} = Config) -> + process_flag(trap_exit, true), State0 = maps:without([private_key], Config), State = State0#{ jwk => undefined, @@ -148,7 +156,7 @@ handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) -> handle_cast(_Req, State) -> {noreply, State}. -handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> +handle_info({timeout, _TRef, ?refresh_jwt}, State0) -> State = generate_and_store_jwt(State0), {noreply, State}; handle_info(_Msg, State) -> @@ -161,6 +169,11 @@ format_status(_Opt, [_PDict, State0]) -> State = censor_secrets(State0), [{data, [{"State", State}]}]. +terminate(_Reason, State) -> + #{resource_id := ResourceId, table := TId} = State, + emqx_connector_jwt:delete_jwt(TId, ResourceId), + ok. + %%----------------------------------------------------------------------------------------- %% Helper fns %%----------------------------------------------------------------------------------------- @@ -211,15 +224,14 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> -spec ensure_timer(state()) -> state(). ensure_timer( State = #{ - refresh_timer := undefined, + refresh_timer := OldTimer, expiration := ExpirationMS0 } ) -> + cancel_timer(OldTimer), ExpirationMS = max(5_000, ExpirationMS0 - 5_000), TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt), - State#{refresh_timer => TRef}; -ensure_timer(State) -> - State. + State#{refresh_timer => TRef}. -spec censor_secrets(state()) -> map(). censor_secrets(State = #{jwt := JWT, jwk := JWK}) -> @@ -232,3 +244,10 @@ censor_secret(undefined) -> undefined; censor_secret(_Secret) -> "******". + +-spec cancel_timer(undefined | timer:tref() | reference()) -> ok. +cancel_timer(undefined) -> + ok; +cancel_timer(TRef) -> + _ = erlang:cancel_timer(TRef), + ok. diff --git a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl index 87ce70d59..d05ff196d 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_SUITE.erl @@ -67,3 +67,13 @@ t_lookup_jwt_missing(_Config) -> ResourceId = <<"resource id">>, ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(ResourceId)), ok. + +t_delete_jwt(_Config) -> + TId = ?JWT_TABLE, + JWT = <<"some jwt">>, + ResourceId = <<"resource id">>, + true = insert_jwt(TId, ResourceId, JWT), + {ok, _} = emqx_connector_jwt:lookup_jwt(ResourceId), + ?assertEqual(ok, emqx_connector_jwt:delete_jwt(TId, ResourceId)), + ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)), + ok. diff --git a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl index 74075917e..10eb41388 100644 --- a/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_jwt_worker_SUITE.erl @@ -186,14 +186,30 @@ t_refresh(_Config) -> {ok, SecondJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId), ?assertNot(is_expired(SecondJWT)), ?assert(is_expired(FirstJWT)), - {FirstJWT, SecondJWT} + %% check yet another refresh to ensure the timer was properly + %% reset. + ?block_until( + #{ + ?snk_kind := connector_jwt_worker_refresh, + jwt := JWT1 + } when + JWT1 =/= SecondJWT andalso + JWT1 =/= FirstJWT, + 15_000 + ), + {ok, ThirdJWT} = emqx_connector_jwt:lookup_jwt(Table, ResourceId), + ?assertNot(is_expired(ThirdJWT)), + ?assert(is_expired(SecondJWT)), + {FirstJWT, SecondJWT, ThirdJWT} end, - fun({FirstJWT, SecondJWT}, Trace) -> + fun({FirstJWT, SecondJWT, ThirdJWT}, Trace) -> ?assertMatch( - [_, _ | _], + [_, _, _ | _], ?of_kind(connector_jwt_worker_token_stored, Trace) ), ?assertNotEqual(FirstJWT, SecondJWT), + ?assertNotEqual(SecondJWT, ThirdJWT), + ?assertNotEqual(FirstJWT, ThirdJWT), ok end ), @@ -289,7 +305,7 @@ t_lookup_badarg(_Config) -> t_start_supervised_worker(_Config) -> {ok, _} = emqx_connector_jwt_sup:start_link(), - Config = #{resource_id := ResourceId} = generate_config(), + Config = #{resource_id := ResourceId, table := TId} = generate_config(), {ok, Pid} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), Ref = emqx_connector_jwt_worker:ensure_jwt(Pid), receive @@ -300,6 +316,7 @@ t_start_supervised_worker(_Config) -> end, MRef = monitor(process, Pid), ?assert(is_process_alive(Pid)), + ?assertMatch({ok, _}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)), ok = emqx_connector_jwt_sup:ensure_worker_deleted(ResourceId), receive {'DOWN', MRef, process, Pid, _} -> @@ -307,6 +324,11 @@ t_start_supervised_worker(_Config) -> after 1_000 -> ct:fail("timeout") end, + %% ensure it cleans up its own tokens to avoid leakage when + %% probing/testing rule resources. + ?assertEqual({error, not_found}, emqx_connector_jwt:lookup_jwt(TId, ResourceId)), + %% ensure the specs are removed from the supervision tree. + ?assertEqual([], supervisor:which_children(emqx_connector_jwt_sup)), ok. t_start_supervised_worker_already_started(_Config) -> @@ -322,9 +344,9 @@ t_start_supervised_worker_already_present(_Config) -> Config = #{resource_id := ResourceId} = generate_config(), {ok, Pid0} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), Ref = monitor(process, Pid0), - exit(Pid0, {shutdown, normal}), + exit(Pid0, kill), receive - {'DOWN', Ref, process, Pid0, {shutdown, normal}} -> ok + {'DOWN', Ref, process, Pid0, killed} -> ok after 1_000 -> error(worker_didnt_stop) end, {ok, Pid1} = emqx_connector_jwt_sup:ensure_worker_present(ResourceId, Config), diff --git a/changes/v5.0.14-en.md b/changes/v5.0.14-en.md new file mode 100644 index 000000000..214d4e58e --- /dev/null +++ b/changes/v5.0.14-en.md @@ -0,0 +1,8 @@ +# v5.0.14 + +## Enhancements + + +## Bug Fixes + +- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9641](https://github.com/emqx/emqx/pull/9641) diff --git a/changes/v5.0.14-zh.md b/changes/v5.0.14-zh.md new file mode 100644 index 000000000..dc77784a9 --- /dev/null +++ b/changes/v5.0.14-zh.md @@ -0,0 +1,8 @@ +# v5.0.14 + +## 增强 + + +## 修复 + +- 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index b84b7d74b..f83a96cb2 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -1336,6 +1336,7 @@ t_stop(Config) -> fun(Res, Trace) -> ?assertMatch({ok, {ok, _}}, Res), ?assertMatch([_], ?of_kind(gcp_pubsub_stop, Trace)), + ?assertMatch([_ | _], ?of_kind(connector_jwt_deleted, Trace)), ok end ), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 84f9bec8b..dfebd75f5 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -1,6 +1,6 @@ {application, emqx_ee_connector, [ {description, "EMQX Enterprise connectors"}, - {vsn, "0.1.1"}, + {vsn, "0.1.2"}, {registered, []}, {applications, [ kernel, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index 139cb89e9..29170be41 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -154,6 +154,7 @@ on_stop( connector => InstanceId }), emqx_connector_jwt_sup:ensure_worker_deleted(JWTWorkerId), + emqx_connector_jwt:delete_jwt(?JWT_TABLE, InstanceId), ehttpc_sup:stop_pool(PoolName). -spec on_query(