Merge remote-tracking branch 'origin/release-v44' into 1104-ci-split-ct-runs

This commit is contained in:
Zaiming (Stone) Shi 2022-11-04 14:34:24 +01:00
commit 3346f2a26c
13 changed files with 802 additions and 34 deletions

View File

@ -28,3 +28,5 @@
-define(bound_v(Key, ENVS0),
maps:get(Key,
maps:get(?BINDING_KEYS, ENVS0, #{}))).
-define(JWT_TABLE, emqx_rule_engine_jwt_table).

View File

@ -1,5 +1,6 @@
%% -*- mode: erlang -*-
{deps, []}.
{deps, [ {jose, {git, "https://github.com/emqx/erlang-jose", {tag, "emqx-1.11.3"}}}
]}.
%% Comple Opts
{erl_opts, [warn_unused_vars,

View File

@ -2,8 +2,8 @@
[{description, "EMQ X Rule Engine"},
{vsn, "4.4.11"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]},
{registered, [emqx_rule_engine_sup, emqx_rule_registry, emqx_rule_engine_jwt_sup]},
{applications, [kernel,stdlib,rulesql,getopt,jose]},
{mod, {emqx_rule_engine_app, []}},
{env, []},
{licenses, ["Apache-2.0"]},

View File

@ -2,7 +2,12 @@
%% Unless you know what you are doing, DO NOT edit manually!!
{VSN,
[{"4.4.10",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -13,7 +18,12 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.9",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -25,7 +35,12 @@
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.8",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -38,7 +53,12 @@
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -51,7 +71,12 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.5",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -65,7 +90,12 @@
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.4",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -79,7 +109,12 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -95,7 +130,12 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.4.2",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -112,7 +152,12 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.4.1",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -129,7 +174,12 @@
{add_module,emqx_rule_date},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.4.0",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
[{add_module,emqx_rule_engine_jwt},
{add_module,emqx_rule_engine_jwt_worker},
{add_module,emqx_rule_engine_jwt_sup},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{apply,{emqx_rule_engine_sup,start_jwt_sup,[]}},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -153,39 +203,58 @@
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.9",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.8",
[{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{<<"4\\.4\\.[6-7]">>,
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -194,11 +263,17 @@
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.5",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -208,11 +283,17 @@
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.4",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
@ -222,10 +303,16 @@
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.3",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
@ -238,10 +325,16 @@
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.2",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
@ -255,10 +348,16 @@
{delete_module,emqx_rule_date},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.1",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
@ -272,10 +371,16 @@
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{"4.4.0",
[{load_module,emqx_rule_monitor,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_app,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_sup,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
@ -289,5 +394,10 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{apply,{supervisor,terminate_child,
[emqx_rule_engine_sup,emqx_rule_engine_jwt_sup]}},
{delete_module,emqx_rule_date},
{delete_module,emqx_rule_engine_jwt_sup},
{delete_module,emqx_rule_engine_jwt_worker},
{delete_module,emqx_rule_engine_jwt}]},
{<<".*">>,[]}]}.

View File

@ -0,0 +1,45 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_jwt).
-include("rule_engine.hrl").
-include("rule_actions.hrl").
%% API
-export([ lookup_jwt/1
, lookup_jwt/2
]).
-type jwt() :: binary().
-spec lookup_jwt(resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(ResourceId) ->
?MODULE:lookup_jwt(?JWT_TABLE, ResourceId).
-spec lookup_jwt(ets:table(), resource_id()) -> {ok, jwt()} | {error, not_found}.
lookup_jwt(TId, ResourceId) ->
try
case ets:lookup(TId, {ResourceId, jwt}) of
[{{ResourceId, jwt}, JWT}] ->
{ok, JWT};
[] ->
{error, not_found}
end
catch
error:badarg ->
{error, not_found}
end.

View File

@ -0,0 +1,89 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_jwt_sup).
-behaviour(supervisor).
-export([ start_link/0
, ensure_worker_present/2
, ensure_worker_deleted/1
]).
-export([init/1]).
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-type worker_id() :: term().
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
ensure_jwt_table(),
SupFlags = #{ strategy => one_for_one
, intensity => 10
, period => 5
, auto_shutdown => never
},
ChildSpecs = [],
{ok, {SupFlags, ChildSpecs}}.
%% @doc Starts a new JWT worker. The caller should use
%% `emqx_rule_engine_jwt_sup:ensure_jwt/1' to ensure that a JWT has
%% been stored, if synchronization is needed.
-spec ensure_worker_present(worker_id(), map()) ->
{ok, supervisor:child()}.
ensure_worker_present(Id, Config) ->
ChildSpec = jwt_worker_child_spec(Id, Config),
case supervisor:start_child(?MODULE, ChildSpec) of
{ok, Pid} ->
{ok, Pid};
{error, {already_started, Pid}} ->
{ok, Pid};
{error, already_present} ->
supervisor:restart_child(?MODULE, Id)
end.
%% @doc Stops a given JWT worker by its id.
-spec ensure_worker_deleted(worker_id()) -> ok.
ensure_worker_deleted(Id) ->
case supervisor:terminate_child(?MODULE, Id) of
ok -> ok;
{error, not_found} -> ok
end.
jwt_worker_child_spec(Id, Config) ->
#{ id => Id
, start => {emqx_rule_engine_jwt_worker, start_link, [Config]}
, restart => transient
, type => worker
, significant => false
, shutdown => brutal_kill
, modules => [emqx_rule_engine_jwt_worker]
}.
-spec ensure_jwt_table() -> ok.
ensure_jwt_table() ->
case ets:whereis(?JWT_TABLE) of
undefined ->
Opts = [named_table, public,
{read_concurrency, true}, ordered_set],
_ = ets:new(?JWT_TABLE, Opts),
ok;
_ ->
ok
end.

View File

@ -0,0 +1,216 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_jwt_worker).
-behaviour(gen_server).
%% API
-export([ start_link/1
, ensure_jwt/1
]).
%% gen_server API
-export([ init/1
, handle_continue/2
, handle_call/3
, handle_cast/2
, handle_info/2
, format_status/1
, format_status/2
]).
-include_lib("jose/include/jose_jwk.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("emqx_rule_engine/include/rule_actions.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-type config() :: #{ private_key := binary()
, resource_id := resource_id()
, expiration := timer:time()
, table := ets:table()
, iss := binary()
, sub := binary()
, aud := binary()
, kid := binary()
, alg := binary()
}.
-type jwt() :: binary().
-type state() :: #{ refresh_timer := undefined | timer:tref()
, resource_id := resource_id()
, expiration := timer:time()
, table := ets:table()
, jwt := undefined | jwt()
%% only undefined during startup
, jwk := undefined | jose_jwk:key()
, iss := binary()
, sub := binary()
, aud := binary()
, kid := binary()
, alg := binary()
}.
-define(refresh_jwt, refresh_jwt).
%%-----------------------------------------------------------------------------------------
%% API
%%-----------------------------------------------------------------------------------------
-spec start_link(config()) -> gen_server:start_ret().
start_link(#{ private_key := _
, expiration := _
, resource_id := _
, table := _
, iss := _
, sub := _
, aud := _
, kid := _
, alg := _
} = Config) ->
gen_server:start_link(?MODULE, Config, []).
-spec ensure_jwt(pid()) -> reference().
ensure_jwt(Worker) ->
Ref = alias([reply]),
gen_server:cast(Worker, {ensure_jwt, Ref}),
Ref.
%%-----------------------------------------------------------------------------------------
%% gen_server API
%%-----------------------------------------------------------------------------------------
-spec init(config()) -> {ok, state(), {continue, {make_key, binary()}}}
| {stop, {error, term()}}.
init(#{private_key := PrivateKeyPEM} = Config) ->
State0 = maps:without([private_key], Config),
State = State0#{ jwk => undefined
, jwt => undefined
, refresh_timer => undefined
},
{ok, State, {continue, {make_key, PrivateKeyPEM}}}.
handle_continue({make_key, PrivateKeyPEM}, State0) ->
case jose_jwk:from_pem(PrivateKeyPEM) of
JWK = #jose_jwk{} ->
State = State0#{jwk := JWK},
{noreply, State, {continue, create_token}};
[] ->
?tp(rule_engine_jwt_worker_startup_error, #{error => empty_key}),
{stop, {shutdown, {error, empty_key}}, State0};
{error, Reason} ->
Error = {invalid_private_key, Reason},
?tp(rule_engine_jwt_worker_startup_error, #{error => Error}),
{stop, {shutdown, {error, Error}}, State0};
Error0 ->
Error = {invalid_private_key, Error0},
?tp(rule_engine_jwt_worker_startup_error, #{error => Error}),
{stop, {shutdown, {error, Error}}, State0}
end;
handle_continue(create_token, State0) ->
State = generate_and_store_jwt(State0),
{noreply, State}.
handle_call(_Req, _From, State) ->
{reply, {error, bad_call}, State}.
handle_cast({ensure_jwt, From}, State0 = #{jwt := JWT}) ->
State =
case JWT of
undefined ->
generate_and_store_jwt(State0);
_ ->
State0
end,
From ! {From, token_created},
{noreply, State};
handle_cast(_Req, State) ->
{noreply, State}.
handle_info({timeout, TRef, ?refresh_jwt}, State0 = #{refresh_timer := TRef}) ->
State = generate_and_store_jwt(State0),
{noreply, State};
handle_info(_Msg, State) ->
{noreply, State}.
format_status(State) ->
censor_secrets(State).
format_status(_Opt, [_PDict, State0]) ->
State = censor_secrets(State0),
[{data, [{"State", State}]}].
%%-----------------------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------------------
-spec do_generate_jwt(state()) -> jwt().
do_generate_jwt(#{ expiration := ExpirationMS
, iss := Iss
, sub := Sub
, aud := Aud
, kid := KId
, alg := Alg
, jwk := JWK
} = _State) ->
Headers = #{ <<"alg">> => Alg
, <<"kid">> => KId
},
Now = erlang:system_time(seconds),
ExpirationS = erlang:convert_time_unit(ExpirationMS, millisecond, second),
Claims = #{ <<"iss">> => Iss
, <<"sub">> => Sub
, <<"aud">> => Aud
, <<"iat">> => Now
, <<"exp">> => Now + ExpirationS
},
JWT0 = jose_jwt:sign(JWK, Headers, Claims),
{_, JWT} = jose_jws:compact(JWT0),
JWT.
-spec generate_and_store_jwt(state()) -> state().
generate_and_store_jwt(State0) ->
JWT = do_generate_jwt(State0),
store_jwt(State0, JWT),
?tp(rule_engine_jwt_worker_refresh, #{jwt => JWT}),
State1 = State0#{jwt := JWT},
ensure_timer(State1).
-spec store_jwt(state(), jwt()) -> ok.
store_jwt(#{resource_id := ResourceId, table := TId}, JWT) ->
true = ets:insert(TId, {{ResourceId, jwt}, JWT}),
?tp(rule_engine_jwt_worker_token_stored, #{resource_id => ResourceId}),
ok.
-spec ensure_timer(state()) -> state().
ensure_timer(State = #{ refresh_timer := undefined
, expiration := ExpirationMS0
}) ->
ExpirationMS = max(5_000, ExpirationMS0 - 5_000),
TRef = erlang:start_timer(ExpirationMS, self(), ?refresh_jwt),
State#{refresh_timer => TRef};
ensure_timer(State) ->
State.
-spec censor_secrets(state()) -> map().
censor_secrets(State) ->
maps:map(
fun(Key, _Value) when Key =:= jwt;
Key =:= jwk ->
"******";
(_Key, Value) ->
Value
end,
State).

View File

@ -22,7 +22,9 @@
-export([start_link/0]).
-export([start_locker/0]).
-export([ start_locker/0
, start_jwt_sup/0
]).
-export([init/1]).
@ -31,8 +33,12 @@ start_link() ->
init([]) ->
Opts = [public, named_table, set, {read_concurrency, true}],
_ = ets:new(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
_ = ets:new(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
ensure_table(?ACTION_INST_PARAMS_TAB, [{keypos, #action_instance_params.id}|Opts]),
ensure_table(?RES_PARAMS_TAB, [{keypos, #resource_params.id}|Opts]),
SupFlags = #{ strategy => one_for_one
, intensity => 10
, period => 10
},
Registry = #{id => emqx_rule_registry,
start => {emqx_rule_registry, start_link, []},
restart => permanent,
@ -51,7 +57,8 @@ init([]) ->
shutdown => 5000,
type => worker,
modules => [emqx_rule_monitor]},
{ok, {{one_for_one, 10, 10}, [Registry, Metrics, Monitor]}}.
JWTSup = jwt_sup_child_spec(),
{ok, {SupFlags, [Registry, Metrics, Monitor, JWTSup]}}.
start_locker() ->
Locker = #{id => emqx_rule_locker,
@ -61,3 +68,32 @@ start_locker() ->
type => worker,
modules => [emqx_rule_locker]},
supervisor:start_child(?MODULE, Locker).
start_jwt_sup() ->
JWTSup = jwt_sup_child_spec(),
supervisor:start_child(?MODULE, JWTSup).
jwt_sup_child_spec() ->
#{ id => emqx_rule_engine_jwt_sup
, start => {emqx_rule_engine_jwt_sup, start_link, []}
, type => supervisor
, restart => permanent
, shutdown => 5_000
, modules => [emqx_rule_engine_jwt_sup]
}.
ensure_table(Name, Opts) ->
try
case ets:whereis(name) of
undefined ->
_ = ets:new(Name, Opts),
ok;
_ ->
ok
end
catch
%% stil the table exists (somehow can happen in hot-upgrade,
%% it seems).
error:badarg ->
ok
end.

View File

@ -19,6 +19,7 @@
-behaviour(gen_server).
-include("rule_engine.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%% API functions
-export([ start_link/0
@ -222,7 +223,9 @@ inc(Id, Metric, Val) ->
counters:add(couters_ref(Id), metrics_idx(Metric), Val);
Ref ->
counters:add(Ref, metrics_idx(Metric), Val)
end.
end,
?tp(rule_metrics_inc, #{id => Id, metric => Metric, value => Val}),
ok.
inc_actions_taken(Id) ->
inc_actions_taken(Id, 1).

View File

@ -0,0 +1,248 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_rule_engine_jwt_worker_SUITE).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-include_lib("jose/include/jose_jwt.hrl").
-include_lib("jose/include/jose_jws.hrl").
-compile([export_all, nowarn_export_all]).
%%-----------------------------------------------------------------------------
%% CT boilerplate
%%-----------------------------------------------------------------------------
all() ->
emqx_ct:all(?MODULE).
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
%%-----------------------------------------------------------------------------
%% Helper fns
%%-----------------------------------------------------------------------------
generate_private_key_pem() ->
PublicExponent = 65537,
Size = 2048,
Key = public_key:generate_key({rsa, Size, PublicExponent}),
DERKey = public_key:der_encode('PrivateKeyInfo', Key),
public_key:pem_encode([{'PrivateKeyInfo', DERKey, not_encrypted}]).
generate_config() ->
PrivateKeyPEM = generate_private_key_pem(),
ResourceID = emqx_guid:gen(),
#{ private_key => PrivateKeyPEM
, expiration => timer:hours(1)
, resource_id => ResourceID
, table => ets:new(test_jwt_table, [ordered_set, public])
, iss => <<"issuer">>
, sub => <<"subject">>
, aud => <<"audience">>
, kid => <<"key id">>
, alg => <<"RS256">>
}.
is_expired(JWT) ->
#jose_jwt{fields = #{<<"exp">> := Exp}} = jose_jwt:peek(JWT),
Now = erlang:system_time(seconds),
Now >= Exp.
%%-----------------------------------------------------------------------------
%% Test cases
%%-----------------------------------------------------------------------------
t_create_success(_Config) ->
Config = generate_config(),
Res = emqx_rule_engine_jwt_worker:start_link(Config),
?assertMatch({ok, _}, Res),
{ok, Worker} = Res,
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker),
receive
{Ref, token_created} ->
ok
after
1_000 ->
ct:fail("should have confirmed token creation; msgs: ~0p",
[process_info(self(), messages)])
end,
ok.
t_empty_key(_Config) ->
Config0 = generate_config(),
Config = Config0#{private_key := <<>>},
process_flag(trap_exit, true),
?check_trace(
?wait_async_action(
?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)),
#{?snk_kind := rule_engine_jwt_worker_startup_error},
1_000),
fun(Trace) ->
?assertMatch([#{error := empty_key}],
?of_kind(rule_engine_jwt_worker_startup_error, Trace)),
ok
end),
ok.
t_invalid_pem(_Config) ->
Config0 = generate_config(),
InvalidPEM = public_key:pem_encode([{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted},
{'PrivateKeyInfo', <<"xxxxxx">>, not_encrypted}]),
Config = Config0#{private_key := InvalidPEM},
process_flag(trap_exit, true),
?check_trace(
?wait_async_action(
?assertMatch({ok, _}, emqx_rule_engine_jwt_worker:start_link(Config)),
#{?snk_kind := rule_engine_jwt_worker_startup_error},
1_000),
fun(Trace) ->
?assertMatch([#{error := {invalid_private_key, _}}],
?of_kind(rule_engine_jwt_worker_startup_error, Trace)),
ok
end),
ok.
t_refresh(_Config) ->
Config0 = #{ table := Table
, resource_id := ResourceId
} = generate_config(),
Config = Config0#{expiration => 5_000},
?check_trace(
begin
{{ok, _Pid}, {ok, _Event}} =
?wait_async_action(
emqx_rule_engine_jwt_worker:start_link(Config),
#{?snk_kind := rule_engine_jwt_worker_token_stored},
5_000),
{ok, FirstJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
?block_until(#{?snk_kind := rule_engine_jwt_worker_refresh,
jwt := JWT0} when JWT0 =/= FirstJWT, 15_000),
{ok, SecondJWT} = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
?assertNot(is_expired(SecondJWT)),
?assert(is_expired(FirstJWT)),
{FirstJWT, SecondJWT}
end,
fun({FirstJWT, SecondJWT}, Trace) ->
?assertMatch([_, _ | _],
?of_kind(rule_engine_jwt_worker_token_stored, Trace)),
?assertNotEqual(FirstJWT, SecondJWT),
ok
end),
ok.
t_format_status(_Config) ->
Config = generate_config(),
{ok, Pid} = emqx_rule_engine_jwt_worker:start_link(Config),
{status, _, _, Props} = sys:get_status(Pid),
[State] = [State
|| Info = [_ | _] <- Props,
{data, Data = [_ | _]} <- Info,
{"State", State} <- Data],
?assertMatch(
#{ jwt := "******"
, jwk := "******"
},
State),
ok.
t_lookup_ok(_Config) ->
Config = #{ table := Table
, resource_id := ResourceId
, private_key := PrivateKeyPEM
, aud := Aud
, iss := Iss
, sub := Sub
, kid := KId
} = generate_config(),
{ok, Worker} = emqx_rule_engine_jwt_worker:start_link(Config),
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Worker),
receive
{Ref, token_created} ->
ok
after
500 ->
error(timeout)
end,
Res = emqx_rule_engine_jwt:lookup_jwt(Table, ResourceId),
?assertMatch({ok, _}, Res),
{ok, JWT} = Res,
?assert(is_binary(JWT)),
JWK = jose_jwk:from_pem(PrivateKeyPEM),
{IsValid, ParsedJWT, JWS} = jose_jwt:verify_strict(JWK, [<<"RS256">>], JWT),
?assertMatch(
#jose_jwt{
fields = #{ <<"aud">> := Aud
, <<"iss">> := Iss
, <<"sub">> := Sub
, <<"exp">> := _
, <<"iat">> := _
}},
ParsedJWT),
?assertNot(is_expired(JWT)),
?assertMatch(
#jose_jws{
alg = {_, 'RS256'},
fields = #{ <<"kid">> := KId
, <<"typ">> := <<"JWT">>
}},
JWS),
?assert(IsValid),
ok.
t_lookup_not_found(_Config) ->
Table = ets:new(test_jwt_table, [ordered_set, public]),
InexistentResource = <<"xxx">>,
?assertEqual({error, not_found},
emqx_rule_engine_jwt:lookup_jwt(Table, InexistentResource)),
ok.
t_lookup_badarg(_Config) ->
InexistentTable = i_dont_exist,
InexistentResource = <<"xxx">>,
?assertEqual({error, not_found},
emqx_rule_engine_jwt:lookup_jwt(InexistentTable, InexistentResource)),
ok.
t_start_supervised_worker(_Config) ->
{ok, _} = emqx_rule_engine_jwt_sup:start_link(),
Config = #{resource_id := ResourceId} = generate_config(),
{ok, Pid} = emqx_rule_engine_jwt_sup:ensure_worker_present(ResourceId, Config),
Ref = emqx_rule_engine_jwt_worker:ensure_jwt(Pid),
receive
{Ref, token_created} ->
ok
after
5_000 ->
ct:fail("timeout")
end,
MRef = monitor(process, Pid),
?assert(is_process_alive(Pid)),
ok = emqx_rule_engine_jwt_sup:ensure_worker_deleted(ResourceId),
receive
{'DOWN', MRef, process, Pid, _} ->
ok
after
1_000 ->
ct:fail("timeout")
end,
ok.

View File

@ -6,6 +6,8 @@
- Added support for OCSP (Online Certificate Status Protocol) Stapling
- Added CRL (Certificate Revocation List) cache auto refresh
- Added a JWT worker for creating and refreshing JWT tokens in rule engine actions. [#9241](https://github.com/emqx/emqx/pull/9241)
### Bug fixes
- Fix get trace list crash when trace not initialize. [#9156](https://github.com/emqx/emqx/pull/9156)

View File

@ -5,7 +5,9 @@
- Erlang/OTP [SSL库漏洞修复](https://nvd.nist.gov/vuln/detail/CVE-2022-37026)
- 增加了对 OCSP (Online Certificate Status Protocol) Stapling 的支持
- 增加了 CRL证书吊销列表缓存的自动刷新功能
- 增加了一个JWT工作者用于在规则引擎动作中创建和刷新JWT令牌。[#9241](https://github.com/emqx/emqx/pull/9241)
### 修复
- 修复日志追踪模块没开启时GET Trace 列表接口报错的问题。[#9156](https://github.com/emqx/emqx/pull/9156)

View File

@ -31,12 +31,23 @@ start_slave(Name) ->
start_slave(Name, #{}).
start_slave(Name, Opts) ->
SlaveMod = maps:get(slave_mod, Opts, ct_slave),
Node = make_node_name(Name),
case ct_slave:start(Node, [{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]) of
DoStart =
fun() ->
case SlaveMod of
ct_slave ->
ct_slave:start(Node,
[{kill_if_fail, true},
{monitor_master, true},
{init_timeout, 10000},
{startup_timeout, 10000},
{erl_flags, ebin_path()}]);
slave ->
slave:start_link(host(), Name, ebin_path())
end
end,
case DoStart() of
{ok, _} ->
ok;
{error, started_not_connected, _} ->
@ -115,6 +126,9 @@ setup_node(Node, #{} = Opts) ->
?assertEqual( node()
, gen_rpc:call(Node, gen_rpc, call, [node(), erlang, node, []])
),
ok = snabbkaffe:forward_trace(Node),
ok.
%% Routes are replicated async.