From 56b1df1c7f0267134db58ae096beba9e9e4fc184 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 09:50:28 -0300 Subject: [PATCH 1/5] fix(gcp_pubsub): ensure jwt token is deleted after worker stops and always ensure refresh timer (v4.4) https://emqx.atlassian.net/browse/EMQX-8652 Since the rule resource testing mechanism creates a new resource to test the configuration, a new JWT associated with an unique temporary resource was being created and left in the JWT table, leaking it. Also, a wrong case clause when setting the new refresh timer for the JWT worker was preventing it from refreshing from the 2nd refresh onward. --- .../src/emqx_rule_engine_jwt.erl | 11 ++++++ .../src/emqx_rule_engine_jwt_sup.erl | 2 +- .../src/emqx_rule_engine_jwt_worker.erl | 34 +++++++++++++++---- .../emqx_rule_engine_jwt_worker_SUITE.erl | 22 +++++++++--- 4 files changed, 57 insertions(+), 12 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl index 828c77f93..237c08c72 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt.erl @@ -22,6 +22,7 @@ %% API -export([ lookup_jwt/1 , lookup_jwt/2 + , delete_jwt/2 ]). -type jwt() :: binary(). @@ -43,3 +44,13 @@ lookup_jwt(TId, ResourceId) -> error:badarg -> {error, not_found} end. + +-spec delete_jwt(ets:table(), resource_id()) -> ok. +delete_jwt(TId, ResourceId) -> + try + ets:delete(TId, {ResourceId, jwt}), + ok + catch + error:badarg -> + ok + end. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index b393dd08b..989be2304 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -72,7 +72,7 @@ jwt_worker_child_spec(Id, Config) -> , restart => transient , type => worker , significant => false - , shutdown => brutal_kill + , shutdown => 5_000 , modules => [emqx_rule_engine_jwt_worker] }. diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl index 4190a3536..7e3604701 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_worker.erl @@ -31,8 +31,11 @@ , handle_info/2 , format_status/1 , format_status/2 + , terminate/2 ]). +-export([force_refresh/1]). + -include_lib("jose/include/jose_jwk.hrl"). -include_lib("emqx_rule_engine/include/rule_engine.hrl"). -include_lib("emqx_rule_engine/include/rule_actions.hrl"). @@ -49,7 +52,7 @@ , alg := binary() }. -type jwt() :: binary(). --type state() :: #{ refresh_timer := undefined | timer:tref() +-type state() :: #{ refresh_timer := undefined | timer:tref() | reference() , resource_id := resource_id() , expiration := timer:time() , table := ets:table() @@ -88,6 +91,11 @@ ensure_jwt(Worker) -> gen_server:cast(Worker, {ensure_jwt, Ref}), Ref. +-spec force_refresh(pid()) -> ok. +force_refresh(Worker) -> + _ = erlang:send(Worker, {timeout, force_refresh, ?refresh_jwt}), + ok. + %%----------------------------------------------------------------------------------------- %% gen_server API %%----------------------------------------------------------------------------------------- @@ -95,6 +103,7 @@ ensure_jwt(Worker) -> -spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}} | {stop, {error, term()}}. init(#{private_key := PrivateKeyPEM} = Config) -> + process_flag(trap_exit, true), State0 = maps:without([private_key], Config), State = State0#{ jwk => undefined , jwt => undefined @@ -139,7 +148,7 @@ handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) -> handle_cast(_Req, State) -> {noreply, State}. -handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) -> +handle_info({timeout, _TRef, ?refresh_jwt}, State0) -> State = generate_and_store_jwt(State0), {noreply, State}; handle_info(_Msg, State) -> @@ -152,6 +161,11 @@ format_status(_Opt, [_PDict, State0]) -> State = censor_secrets(State0), [{data, [{"State", State}]}]. +terminate(_Reason, State) -> + #{resource_id := ResourceId, table := TId} = State, + emqx_rule_engine_jwt:delete_jwt(TId, ResourceId), + ok. + %%----------------------------------------------------------------------------------------- %% Helper fns %%----------------------------------------------------------------------------------------- @@ -195,14 +209,13 @@ store_jwt(#{resource_id := ResourceId, table := TId}, JWT) -> ok. -spec ensure_timer(state()) -> state(). -ensure_timer(State = #{ refresh_timer := undefined +ensure_timer(State = #{ refresh_timer := OldTimer , expiration := ExpirationMS0 }) -> - ExpirationMS = max(5_000, ExpirationMS0 - 5_000), + ok = cancel_timer(OldTimer), + ExpirationMS = max(5_000, ExpirationMS0 - 60_000), TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt), - State#{refresh_timer => TRef}; -ensure_timer(State) -> - State. + State#{refresh_timer => TRef}. -spec censor_secrets(state()) -> map(). censor_secrets(State) -> @@ -214,3 +227,10 @@ censor_secrets(State) -> Value end, State). + +-spec cancel_timer(undefined | timer:tref() | reference()) -> ok. +cancel_timer(undefined) -> + ok; +cancel_timer(TRef) -> + _ = erlang:cancel_timer(TRef), + ok. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index fc84293e3..0de58df40 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -140,12 +140,22 @@ t_refresh(_Config) -> {ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), ?assertNot(is_expired(SecondJWT)), ?assert(is_expired(FirstJWT)), - {FirstJWT, SecondJWT} + %% check yet another refresh to ensure the timer was properly + %% reset. + ?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh, + jwt := JWT1} when JWT1 =/= SecondJWT + andalso JWT1 =/= FirstJWT, 15_000), + {ok, ThirdJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId), + ?assertNot(is_expired(ThirdJWT)), + ?assert(is_expired(SecondJWT)), + {FirstJWT, SecondJWT, ThirdJWT} end, - fun({FirstJWT, SecondJWT}, Trace) -> - ?assertMatch([_, _ | _], + fun({FirstJWT, SecondJWT, ThirdJWT}, Trace) -> + ?assertMatch([_, _, _ | _], ?of_kind(rule_engine_jwt_worker_token_stored, Trace)), ?assertNotEqual(FirstJWT, SecondJWT), + ?assertNotEqual(SecondJWT, ThirdJWT), + ?assertNotEqual(FirstJWT, ThirdJWT), ok end), ok. @@ -225,7 +235,7 @@ t_lookup_badarg(_Config) -> t_start_supervised_worker(_Config) -> {ok, _} = emqx_rule_engine_jwt_sup:start_link(), - Config = #{resource_id := ResourceId} = generate_config(), + Config = #{resource_id := ResourceId, table := TId} = generate_config(), {ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config), Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid), receive @@ -237,6 +247,7 @@ t_start_supervised_worker(_Config) -> end, MRef = monitor(process, Pid), ?assert(is_process_alive(Pid)), + ?assertMatch({ok, _}, emqx_rule_engine_jwt:lookup_jwt(TId, ResourceId)), ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId), receive {'DOWN', MRef, process, Pid, _} -> @@ -245,4 +256,7 @@ t_start_supervised_worker(_Config) -> 1_000 -> ct:fail("timeout") end, + %% ensure it cleans up its own tokens to avoid leakage when + %% probing/testing rule resources. + ?assertEqual({error, not_found}, emqx_rule_engine_jwt:lookup_jwt(TId, ResourceId)), ok. From 7f8c5dcf0185247f44467b1268cda5dec11d9f76 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 10:25:35 -0300 Subject: [PATCH 2/5] fix(jwt_sup): delete the worker spec from the supervision tree --- apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl | 6 ++++-- .../test/emqx_rule_engine_jwt_worker_SUITE.erl | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl index 989be2304..d564097dd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_jwt_sup.erl @@ -62,8 +62,10 @@ ensure_worker_present(Id, Config) -> -spec ensure_worker_deleted(worker_id()) -> ok. ensure_worker_deleted(Id) -> case supervisor:terminate_child(?MODULE, Id) of - ok -> ok; - {error, not_found} -> ok + ok -> + ok = supervisor:delete_child(?MODULE, Id); + {error, not_found} -> + ok end. jwt_worker_child_spec(Id, Config) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl index 0de58df40..4236913fb 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_jwt_worker_SUITE.erl @@ -259,4 +259,6 @@ t_start_supervised_worker(_Config) -> %% ensure it cleans up its own tokens to avoid leakage when %% probing/testing rule resources. ?assertEqual({error, not_found}, emqx_rule_engine_jwt:lookup_jwt(TId, ResourceId)), + %% ensure the specs are removed from the supervision tree. + ?assertEqual([], supervisor:which_children(emqx_rule_engine_jwt_sup)), ok. From d345ccf4b8a25a255ddc94581e3aa93ac35888c9 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 11:45:45 -0300 Subject: [PATCH 3/5] chore: update appups --- .../src/emqx_rule_engine.appup.src | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index c3273d701..3c7ae983f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,9 +1,16 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.4.12", + [{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, + {update,emqx_rule_engine_jwt_sup,supervisor}, + {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, + {update,emqx_rule_engine_jwt_sup,supervisor}, + {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_started,[]}}, @@ -213,9 +220,16 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.4.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.4.12", + [{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, + {update,emqx_rule_engine_jwt_sup,supervisor}, + {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.4.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_engine_jwt_worker,brutal_purge,soft_purge,[]}, + {update,emqx_rule_engine_jwt_sup,supervisor}, + {load_module,emqx_rule_engine_jwt,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {apply,{emqx_rule_engine_sup,ensure_api_delegator_stopped,[]}}, {load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, From abcc6263bb6df7cec1f753bf5ac914de29c04527 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 13:04:29 -0300 Subject: [PATCH 4/5] docs: update changelog --- changes/v4.4.13-en.md | 8 ++++++++ changes/v4.4.13-zh.md | 8 ++++++++ 2 files changed, 16 insertions(+) create mode 100644 changes/v4.4.13-en.md create mode 100644 changes/v4.4.13-zh.md diff --git a/changes/v4.4.13-en.md b/changes/v4.4.13-en.md new file mode 100644 index 000000000..af4485ed7 --- /dev/null +++ b/changes/v4.4.13-en.md @@ -0,0 +1,8 @@ +# v4.4.13 + +## Enhancements + + +## Bug Fixes + +- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9640](https://github.com/emqx/emqx/pull/9640) diff --git a/changes/v4.4.13-zh.md b/changes/v4.4.13-zh.md new file mode 100644 index 000000000..62270e9a8 --- /dev/null +++ b/changes/v4.4.13-zh.md @@ -0,0 +1,8 @@ +# v4.4.13 + +## 增强 + + +## 修复 + +- 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640) From 93fc8ff1e5068edcb06d73c3160f19ab59aff5b2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 29 Dec 2022 16:45:59 -0300 Subject: [PATCH 5/5] docs: improve changelog Co-authored-by: Zaiming (Stone) Shi --- changes/v4.4.13-en.md | 2 +- changes/v4.4.13-zh.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/changes/v4.4.13-en.md b/changes/v4.4.13-en.md index af4485ed7..d3c125600 100644 --- a/changes/v4.4.13-en.md +++ b/changes/v4.4.13-en.md @@ -5,4 +5,4 @@ ## Bug Fixes -- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time. [#9640](https://github.com/emqx/emqx/pull/9640) +- Fix an issue where testing the GCP PubSub could leak memory, and an issue where its JWT token would fail to refresh a second time [#9640](https://github.com/emqx/emqx/pull/9640). diff --git a/changes/v4.4.13-zh.md b/changes/v4.4.13-zh.md index 62270e9a8..61bc33e24 100644 --- a/changes/v4.4.13-zh.md +++ b/changes/v4.4.13-zh.md @@ -5,4 +5,4 @@ ## 修复 -- 修复了测试GCP PubSub可能泄露内存的问题,以及其JWT令牌第二次刷新失败的问题。 [#9640](https://github.com/emqx/emqx/pull/9640) +- 修复了测试 GCP PubSub 可能泄露内存的问题,以及其 JWT 令牌第二次刷新失败的问题 [#9640](https://github.com/emqx/emqx/pull/9640)。