Merge branch 'dev/v4.3.14' into fix_patter_match_v4.3

This commit is contained in:
Xinyu Liu 2022-04-01 09:14:34 +08:00 committed by GitHub
commit 3cc0bb8284
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 1121 additions and 755 deletions

View File

@ -89,11 +89,6 @@ emqx_test(){
"rpm") "rpm")
packagename=$(basename "${PACKAGE_PATH}/${EMQX_NAME}"-*.rpm) packagename=$(basename "${PACKAGE_PATH}/${EMQX_NAME}"-*.rpm)
if [[ "${ARCH}" == "amd64" && $(rpm -E '%{rhel}') == 7 ]] ; then
# EMQX OTP requires openssl11 to have TLS1.3 support
yum install -y openssl11
fi
rpm -ivh "${PACKAGE_PATH}/${packagename}" rpm -ivh "${PACKAGE_PATH}/${packagename}"
if ! rpm -q emqx | grep -q emqx; then if ! rpm -q emqx | grep -q emqx; then
echo "package install error" echo "package install error"

View File

@ -10,12 +10,28 @@ File format:
- One list item per change topic - One list item per change topic
Change log ends with a list of github PRs Change log ends with a list of github PRs
## v4.3.14
### Enhancements
* In order to fix the execution order of exhook, e.g. before/after other plugins/modules,
ExHook now supports user customizing emqx_hook execute priority.
### Bug fixes
* Prohibit empty topics in strict mode
## v4.3.13 ## v4.3.13
### Important changes ### Important changes
* For docker image, /opt/emqx/etc has been removed from the VOLUME list, * For docker image, /opt/emqx/etc has been removed from the VOLUME list,
this made it easier for the users to rebuild image on top with changed configs. this made it easier for the users to rebuild image on top with changed configs.
* CentOS 7 Erlang runtime is rebuilt on OpenSSL-1.1.1n (previously on 1.0),
Prior to v4.3.13, EMQX pick certain cipher suites proposed by the clients,
but then fail to handshake resulting in a `malformed_handshake_data` exception.
* CentOS 8 Erlang runtime is rebuilt on RockyLinux 8.
'centos8' will remain in the package name to keep it backward compatible.
### Enhancements ### Enhancements

View File

@ -99,6 +99,7 @@ $(PROFILES:%=clean-%):
.PHONY: clean-all .PHONY: clean-all
clean-all: clean-all:
@rm -rf _build @rm -rf _build
@rm rebar.lock
.PHONY: deps-all .PHONY: deps-all
deps-all: $(REBAR) $(PROFILES:%=deps-%) deps-all: $(REBAR) $(PROFILES:%=deps-%)

View File

@ -1,6 +1,6 @@
{application, emqx_auth_jwt, {application, emqx_auth_jwt,
[{description, "EMQ X Authentication with JWT"}, [{description, "EMQ X Authentication with JWT"},
{vsn, "4.3.1"}, % strict semver, bump manually! {vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_jwt_sup]}, {registered, [emqx_auth_jwt_sup]},
{applications, [kernel,stdlib,jose]}, {applications, [kernel,stdlib,jose]},

View File

@ -1,13 +1,13 @@
%% -*-: erlang -*- %% -*-: erlang -*-
{VSN, {VSN,
[ [
{"4.3.0", [ {"4.3.[0-1]", [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ [
{"4.3.0", [ {"4.3.[0-1]", [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}

View File

@ -91,7 +91,7 @@ do_init_jwks(Options) ->
[K, V, Reason]), [K, V, Reason]),
undefined; undefined;
J -> J J -> J
catch T:R:_ -> catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n", ?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[K, V, T, R]), [K, V, T, R]),
undefined undefined

View File

@ -24,6 +24,17 @@
## Value: false | Duration ## Value: false | Duration
#exhook.auto_reconnect = 60s #exhook.auto_reconnect = 60s
## The exhook execution priority on the Chain of the emqx hooks.
##
## Modify the field to fix the exhook execute order before/after other plugins/modules.
## By default, most hooks registered by plugins or modules have a priority of 0.
##
## With the same priority of 0, the execute order depends on hookpoints mount order.
## Scilicet is the loaded order of plugins/ modules.
##
## Default: 0
## Value: Integer
#exhook.hook_priority = 0
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## The Hook callback servers ## The Hook callback servers

View File

@ -41,4 +41,6 @@
, {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} , {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
]). ]).
-define(DEFAULT_HOOK_PRIORITY, 0).
-endif. -endif.

View File

@ -15,6 +15,11 @@
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "exhook.hook_priority", "emqx_exhook.hook_priority", [
{default, 0},
{datatype, integer}
]}.
{translation, "emqx_exhook.auto_reconnect", fun(Conf) -> {translation, "emqx_exhook.auto_reconnect", fun(Conf) ->
case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of
"false" -> false; "false" -> false;

View File

@ -1,6 +1,6 @@
{application, emqx_exhook, {application, emqx_exhook,
[{description, "EMQ X Extension for Hook"}, [{description, "EMQ X Extension for Hook"},
{vsn, "4.3.4"}, {vsn, "4.3.5"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{mod, {emqx_exhook_app, []}}, {mod, {emqx_exhook_app, []}},

View File

@ -1,13 +1,23 @@
%% -*-: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ [
{<<"4.3.[0-3]">>, [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]},
{load_module, emqx_exhook_server, brutal_purge,soft_purge,[]},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]},
{<<"4\\.3\\.[0-3]">>, [
{restart_application, emqx_exhook} {restart_application, emqx_exhook}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ [
{<<"4.3.[0-3]">>, [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]},
{load_module, emqx_exhook_server, brutal_purge,soft_purge,[]},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]},
{<<"4\\.3\\.[0-3]">>, [
{restart_application, emqx_exhook} {restart_application, emqx_exhook}
]}, ]},
{<<".*">>, []} {<<".*">>, []}

View File

@ -23,7 +23,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% APIs %% APIs
-export([start_link/3]). -export([start_link/4]).
%% Mgmt API %% Mgmt API
-export([ enable/2 -export([ enable/2
@ -59,9 +59,14 @@
%% Request options %% Request options
request_options :: grpc_client:options(), request_options :: grpc_client:options(),
%% Timer references %% Timer references
trefs :: map() trefs :: map(),
%% Hooks execute options
hooks_options :: hooks_options()
}). }).
-export_type([ server_options/0
, hooks_options/0]).
-type servers() :: [{Name :: atom(), server_options()}]. -type servers() :: [{Name :: atom(), server_options()}].
-type server_options() :: [ {scheme, http | https} -type server_options() :: [ {scheme, http | https}
@ -69,6 +74,10 @@
| {port, inet:port_number()} | {port, inet:port_number()}
]. ].
-type hooks_options() :: #{hook_priority => integer()}.
-define(DEFAULT_HOOK_OPTS, #{hook_priority => ?DEFAULT_HOOK_PRIORITY}).
-define(DEFAULT_TIMEOUT, 60000). -define(DEFAULT_TIMEOUT, 60000).
-define(CNTER, emqx_exhook_counter). -define(CNTER, emqx_exhook_counter).
@ -77,12 +86,12 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) -spec start_link(servers(), false | non_neg_integer(), grpc_client:options(), hooks_options())
->ignore ->ignore
| {ok, pid()} | {ok, pid()}
| {error, any()}. | {error, any()}.
start_link(Servers, AutoReconnect, ReqOpts) -> start_link(Servers, AutoReconnect, ReqOpts, HooksOpts) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts, HooksOpts], []).
-spec enable(pid(), atom()|string()) -> ok | {error, term()}. -spec enable(pid(), atom()|string()) -> ok | {error, term()}.
enable(Pid, Name) -> enable(Pid, Name) ->
@ -102,7 +111,7 @@ call(Pid, Req) ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Servers, AutoReconnect, ReqOpts0]) -> init([Servers, AutoReconnect, ReqOpts0, HooksOpts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% XXX: Due to the ExHook Module in the enterprise, %% XXX: Due to the ExHook Module in the enterprise,
%% this process may start multiple times and they will share this table %% this process may start multiple times and they will share this table
@ -120,32 +129,33 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
%% Load the hook servers %% Load the hook servers
ReqOpts = maps:without([request_failed_action], ReqOpts0), ReqOpts = maps:without([request_failed_action], ReqOpts0),
{Waiting, Running} = load_all_servers(Servers, ReqOpts), {Waiting, Running} = load_all_servers(Servers, ReqOpts, HooksOpts),
{ok, ensure_reload_timer( {ok, ensure_reload_timer(
#state{waiting = Waiting, #state{waiting = Waiting,
running = Running, running = Running,
stopped = #{}, stopped = #{},
request_options = ReqOpts, request_options = ReqOpts,
auto_reconnect = AutoReconnect, auto_reconnect = AutoReconnect,
trefs = #{} trefs = #{},
hooks_options = HooksOpts
} }
)}. )}.
%% @private %% @private
load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, HooksOpts) ->
load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers(Servers, ReqOpts, HooksOpts, #{}, #{}).
load_all_servers([], _Request, Waiting, Running) -> load_all_servers([], _Request, _HooksOpts, Waiting, Running) ->
{Waiting, Running}; {Waiting, Running};
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> load_all_servers([{Name, Options} | More], ReqOpts, HooksOpts, Waiting, Running) ->
{NWaiting, NRunning} = {NWaiting, NRunning} =
case emqx_exhook_server:load(Name, Options, ReqOpts) of case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
{Waiting, Running#{Name => Options}}; {Waiting, Running#{Name => Options}};
{error, _} -> {error, _} ->
{Waiting#{Name => Options}, Running} {Waiting#{Name => Options}, Running}
end, end,
load_all_servers(More, ReqOpts, NWaiting, NRunning). load_all_servers(More, ReqOpts, HooksOpts, NWaiting, NRunning).
handle_call({load, Name}, _From, State) -> handle_call({load, Name}, _From, State) ->
{Result, NState} = do_load_server(Name, State), {Result, NState} = do_load_server(Name, State),
@ -199,8 +209,27 @@ terminate(_Reason, State = #state{running = Running}) ->
_ = unload_exhooks(), _ = unload_exhooks(),
ok. ok.
code_change(_OldVsn, State, _Extra) -> %% in the emqx_exhook:v4.3.5, we have added one new field in the state last:
{ok, State}. %% - hooks_options :: map()
code_change({down, _Vsn}, State, [ToVsn]) ->
case re:run(ToVsn, "4\\.3\\.[0-4]") of
{match, _} ->
NState = list_to_tuple(
lists:droplast(
tuple_to_list(State))),
{ok, NState};
_ ->
{ok, State}
end;
code_change(_Vsn, State, [FromVsn]) ->
case re:run(FromVsn, "4\\.3\\.[0-4]") of
{match, _} ->
NState = list_to_tuple(
tuple_to_list(State) ++ [?DEFAULT_HOOK_OPTS]),
{ok, NState};
_ ->
{ok, State}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
@ -214,7 +243,8 @@ do_load_server(Name, State0 = #state{
waiting = Waiting, waiting = Waiting,
running = Running, running = Running,
stopped = Stopped, stopped = Stopped,
request_options = ReqOpts}) -> request_options = ReqOpts,
hooks_options = HooksOpts}) ->
State = clean_reload_timer(Name, State0), State = clean_reload_timer(Name, State0),
case maps:get(Name, Running, undefined) of case maps:get(Name, Running, undefined) of
undefined -> undefined ->
@ -223,7 +253,7 @@ do_load_server(Name, State0 = #state{
undefined -> undefined ->
{{error, not_found}, State}; {{error, not_found}, State};
Options -> Options ->
case emqx_exhook_server:load(Name, Options, ReqOpts) of case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
?LOG(info, "Load exhook callback server " ?LOG(info, "Load exhook callback server "

View File

@ -25,7 +25,7 @@
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload %% Load/Unload
-export([ load/3 -export([ load/4
, unload/1 , unload/1
]). ]).
@ -81,8 +81,9 @@
%% Load/Unload APIs %% Load/Unload APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} . -spec load(atom(), emqx_exhook_mngr:server_options(), grpc_client:options(), emqx_exhook_mngr:hooks_options())
load(Name0, Opts0, ReqOpts) -> -> {ok, server()} | {error, term()} .
load(Name0, Opts0, ReqOpts, HooksOpts) ->
Name = to_list(Name0), Name = to_list(Name0),
{SvrAddr, ClientOpts} = channel_opts(Opts0), {SvrAddr, ClientOpts} = channel_opts(Opts0),
case emqx_exhook_sup:start_grpc_client_channel( case emqx_exhook_sup:start_grpc_client_channel(
@ -97,7 +98,7 @@ load(Name0, Opts0, ReqOpts) ->
io_lib:format("exhook.~s.", [Name])), io_lib:format("exhook.~s.", [Name])),
ensure_metrics(Prefix, HookSpecs), ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks %% Ensure hooks
ensure_hooks(HookSpecs), ensure_hooks(HookSpecs, maps:get(hook_priority, HooksOpts, ?DEFAULT_HOOK_PRIORITY)),
{ok, #server{name = Name, {ok, #server{name = Name,
options = ReqOpts, options = ReqOpts,
channel = _ChannPoolPid, channel = _ChannPoolPid,
@ -174,7 +175,7 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
case maps:get(name, HookSpec, undefined) of case maps:get(name, HookSpec, undefined) of
undefined -> Acc; undefined -> Acc;
Name0 -> Name0 ->
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, Name = try binary_to_existing_atom(Name0, utf8) catch T:R -> {T,R} end,
case lists:member(Name, AvailableHooks) of case lists:member(Name, AvailableHooks) of
true -> true ->
case lists:member(Name, MessageHooks) of case lists:member(Name, MessageHooks) of
@ -193,13 +194,13 @@ ensure_metrics(Prefix, HookSpecs) ->
|| Hookpoint <- maps:keys(HookSpecs)], || Hookpoint <- maps:keys(HookSpecs)],
lists:foreach(fun emqx_metrics:ensure/1, Keys). lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) -> ensure_hooks(HookSpecs, Priority) ->
lists:foreach(fun(Hookpoint) -> lists:foreach(fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false -> false ->
?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]); ?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]);
{Hookpoint, {M, F, A}} -> {Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}), emqx_hooks:put(Hookpoint, {M, F, A}, Priority),
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end end
end, maps:keys(HookSpecs)). end, maps:keys(HookSpecs)).

View File

@ -18,6 +18,8 @@
-behaviour(supervisor). -behaviour(supervisor).
-include("emqx_exhook.hrl").
-export([ start_link/0 -export([ start_link/0
, init/1 , init/1
]). ]).
@ -43,7 +45,7 @@ start_link() ->
init([]) -> init([]) ->
Mngr = ?CHILD(emqx_exhook_mngr, worker, Mngr = ?CHILD(emqx_exhook_mngr, worker,
[servers(), auto_reconnect(), request_options()]), [servers(), auto_reconnect(), request_options(), hooks_options()]),
{ok, {{one_for_one, 10, 100}, [Mngr]}}. {ok, {{one_for_one, 10, 100}, [Mngr]}}.
servers() -> servers() ->
@ -57,6 +59,10 @@ request_options() ->
request_failed_action => env(request_failed_action, deny) request_failed_action => env(request_failed_action, deny)
}. }.
hooks_options() ->
#{hook_priority => env(hook_priority, ?DEFAULT_HOOK_PRIORITY)
}.
env(Key, Def) -> env(Key, Def) ->
application:get_env(emqx_exhook, Key, Def). application:get_env(emqx_exhook, Key, Def).

View File

@ -98,10 +98,31 @@ t_cli_stats(_) ->
_ = emqx_exhook_cli:cli(x), _ = emqx_exhook_cli:cli(x),
unmeck_print(). unmeck_print().
t_priority(_) ->
restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]),
emqx_exhook:disable(default),
ok = emqx_exhook:enable(default),
[Callback | _] = emqx_hooks:lookup('client.connected'),
1 = emqx_hooks:callback_priority(Callback).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% TODO: make it more general and move to `emqx_ct_helpers`
restart_exhook_with_envs(Envs) ->
emqx_ct_helpers:stop_apps([emqx_exhook]),
SetPriorityFun
= fun(emqx) ->
set_special_cfgs(emqx);
(emqx_exhook) ->
lists:foreach(fun({App, Key, Val}) ->
application:set_env(App, Key, Val)
end, Envs)
end,
emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun).
meck_print() -> meck_print() ->
meck:new(emqx_ctl, [passthrough, no_history, no_link]), meck:new(emqx_ctl, [passthrough, no_history, no_link]),
meck:expect(emqx_ctl, print, fun(_) -> ok end), meck:expect(emqx_ctl, print, fun(_) -> ok end),

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine, {application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"}, [{description, "EMQ X Rule Engine"},
{vsn, "4.3.8"}, % strict semver, bump manually! {vsn, "4.3.9"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]}, {applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -1,7 +1,14 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
@ -79,7 +86,14 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},

View File

@ -37,6 +37,7 @@
, test_resource/1 , test_resource/1
, start_resource/1 , start_resource/1
, get_resource_status/1 , get_resource_status/1
, is_source_alive/1
, get_resource_params/1 , get_resource_params/1
, delete_resource/1 , delete_resource/1
, update_resource/2 , update_resource/2
@ -314,24 +315,37 @@ start_resource(ResId) ->
end. end.
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config0}) -> test_resource(#{type := Type} = Params) ->
case emqx_rule_registry:find_resource_type(Type) of case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC, Create}, {ok, #resource_type{}} ->
on_destroy = {ModD, Destroy}, ResId = maps:get(id, Params, resource_id()),
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try try
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), _ = create_resource(maps:put(id, ResId, Params)),
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), true = is_source_alive(ResId),
ok ok
catch catch E:R:S ->
throw:Reason -> {error, Reason} ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
{error, R}
after
_ = ?CLUSTER_CALL(delete_resource, [ResId])
end; end;
not_found -> not_found ->
{error, {resource_type_not_found, Type}} {error, {resource_type_not_found, Type}}
end. end.
is_source_alive(ResId) ->
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
{ResL, []} ->
is_source_alive_(ResL);
{_, _Errors} ->
false
end.
is_source_alive_([]) -> true;
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
is_source_alive_([_Error | _ResL]) -> false.
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) -> get_resource_status(ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of case emqx_rule_registry:find_resource_params(ResId) of

View File

@ -296,7 +296,7 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) -> list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := ResId}) -> Data = lists:map(fun(Res = #{id := ResId}) ->
Status = get_aggregated_status(ResId), Status = emqx_rule_engine:is_source_alive(ResId),
maps:put(status, Status, Res) maps:put(status, Status, Res)
end, Data0), end, Data0),
return({ok, Data}). return({ok, Data}).
@ -304,14 +304,6 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) -> list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)). return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->

View File

@ -197,6 +197,9 @@
, rfc3339_to_unix_ts/2 , rfc3339_to_unix_ts/2
, now_timestamp/0 , now_timestamp/0
, now_timestamp/1 , now_timestamp/1
, mongo_date/0
, mongo_date/1
, mongo_date/2
]). ]).
%% Proc Dict Func %% Proc Dict Func
@ -900,6 +903,24 @@ time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond; time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond. time_unit(<<"nanosecond">>) -> nanosecond.
mongo_date() ->
erlang:timestamp().
mongo_date(MillisecondsTimestamp) ->
convert_timestamp(MillisecondsTimestamp).
mongo_date(Timestamp, Unit) ->
InsertedTimeUnit = time_unit(Unit),
ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond),
convert_timestamp(ScaledEpoch).
convert_timestamp(MillisecondsTimestamp) ->
MicroTimestamp = MillisecondsTimestamp * 1000,
MegaSecs = MicroTimestamp div 1000_000_000_000,
Secs = MicroTimestamp div 1000_000 - MegaSecs*1000_000,
MicroSecs = MicroTimestamp rem 1000_000,
{MegaSecs, Secs, MicroSecs}.
%% @doc This is for sql funcs that should be handled in the specific modules. %% @doc This is for sql funcs that should be handled in the specific modules.
%% Here the emqx_rule_funcs module acts as a proxy, forwarding %% Here the emqx_rule_funcs module acts as a proxy, forwarding
%% the function handling to the worker module. %% the function handling to the worker module.

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE). -ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.3.13-rc.2"}). -define(EMQX_RELEASE, {opensource, "4.3.13-rc.3"}).
-else. -else.

View File

@ -258,6 +258,28 @@ end}.
{validator, "range4ports", "must be 1024 to 134217727", {validator, "range4ports", "must be 1024 to 134217727",
fun(X) -> X >= 1024 andalso X =< 134217727 end}. fun(X) -> X >= 1024 andalso X =< 134217727 end}.
{validator, "range:0-2", "must be 0 to 2",
fun(X) -> X >= 0 andalso X =< 2 end}.
{validator, "range:0-128", "must be 0 to 128",
fun(X) -> X >= 0 andalso X =< 128 end}.
{validator, "range:0-65535", "must be 0 to 65535",
fun(X) -> X >= 0 andalso X =< 65535 end}.
{validator, "range:1-65535", "must be 1 to 65535",
fun(X) -> X >= 1 andalso X =< 65535 end}.
{validator, "range:1-9", "must be 1 to 9",
fun(X) -> X >= 1 andalso X =< 9 end}.
{validator, "range:8-15", "must be 8 to 15",
fun(X) -> X >= 8 andalso X =< 15 end}.
{validator, "range:0-1024", "must be 0 to 1024",
fun(X) -> X >= 0 andalso X =< 1024 end}.
%% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl %% @doc http://www.erlang.org/doc/man/erl.html#%2bzdbbl
{mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [ {mapping, "node.dist_buffer_size", "vm_args.+zdbbl", [
{datatype, bytesize}, {datatype, bytesize},
@ -293,10 +315,10 @@ end}.
{default, 1000}, {default, 1000},
{datatype, integer}, {datatype, integer},
hidden, hidden,
{validators, ["positive_integer"]} {validators, ["range:0-inf"]}
]}. ]}.
{validator, "positive_integer", "must be a positive integer", {validator, "range:0-inf", "must be a non neg_integer",
fun(X) -> X >= 0 end}. fun(X) -> X >= 0 end}.
%% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES, %% Note: OTP R15 and earlier uses -env ERL_MAX_ETS_TABLES,
@ -801,7 +823,8 @@ end}.
%% @doc Set the Maximum topic levels. %% @doc Set the Maximum topic levels.
{mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [ {mapping, "mqtt.max_topic_levels", "emqx.max_topic_levels", [
{default, 128}, {default, 128},
{datatype, integer} {datatype, integer},
{validators, ["range:0-inf"]}
]}. ]}.
%% @doc Set the Maximum QoS allowed. %% @doc Set the Maximum QoS allowed.
@ -814,7 +837,8 @@ end}.
%% @doc Set the Maximum Topic Alias. %% @doc Set the Maximum Topic Alias.
{mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [ {mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [
{default, 65535}, {default, 65535},
{datatype, integer} {datatype, integer},
{validators, ["range:0-65535"]}
]}. ]}.
%% @doc Whether the server supports MQTT retained messages. %% @doc Whether the server supports MQTT retained messages.
@ -911,7 +935,8 @@ end}.
%% @doc Set the Maximum topic levels. %% @doc Set the Maximum topic levels.
{mapping, "zone.$name.max_topic_levels", "emqx.zones", [ {mapping, "zone.$name.max_topic_levels", "emqx.zones", [
{datatype, integer} {datatype, integer},
{validators, ["range:0-128"]}
]}. ]}.
%% @doc Set the Maximum QoS allowed. %% @doc Set the Maximum QoS allowed.

View File

@ -93,6 +93,7 @@ if [ "$NEW_COPY" = 'no' ]; then
REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')" REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')"
git fetch "$REMOTE" git fetch "$REMOTE"
fi fi
git reset --hard
git clean -fdx git clean -fdx
git checkout "${PREV_TAG}" git checkout "${PREV_TAG}"
make "$PROFILE" make "$PROFILE"

View File

@ -6,7 +6,7 @@
%% the emqx `release' version, which in turn is comprised of several %% the emqx `release' version, which in turn is comprised of several
%% apps, one of which is this. See `emqx_release.hrl' for more %% apps, one of which is this. See `emqx_release.hrl' for more
%% info. %% info.
{vsn, "4.3.14"}, % strict semver, bump manually! {vsn, "4.3.15"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ kernel {applications, [ kernel

File diff suppressed because it is too large Load Diff

View File

@ -28,6 +28,11 @@
, unlock/1 , unlock/1
]). ]).
%% for testing
-ifdef(TEST).
-export([strategy/0]).
-endif.
-spec(start_link() -> startlink_ret()). -spec(start_link() -> startlink_ret()).
start_link() -> start_link() ->
ekka_locker:start_link(?MODULE). ekka_locker:start_link(?MODULE).
@ -63,4 +68,3 @@ unlock(ClientId) ->
-spec(strategy() -> local | leader | quorum | all). -spec(strategy() -> local | leader | quorum | all).
strategy() -> strategy() ->
emqx:get_env(session_locking_strategy, quorum). emqx:get_env(session_locking_strategy, quorum).

View File

@ -38,6 +38,10 @@
, code_change/3 , code_change/3
]). ]).
%% for testing
-ifdef(TEST).
-export([get_policy/0]).
-endif.
%% Tab %% Tab
-define(FLAPPING_TAB, ?MODULE). -define(FLAPPING_TAB, ?MODULE).
%% Default Policy %% Default Policy

View File

@ -265,7 +265,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, <<AckFlags:8, ReasonCode:8, R
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
#{strict_mode := StrictMode, version := Ver}) -> #{strict_mode := StrictMode, version := Ver}) ->
{TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {TopicName, Rest} = parse_topic_name(Bin, StrictMode),
{PacketId, Rest1} = case QoS of {PacketId, Rest1} = case QoS of
?QOS_0 -> {undefined, Rest}; ?QOS_0 -> {undefined, Rest};
_ -> parse_packet_id(Rest) _ -> parse_packet_id(Rest)
@ -357,7 +357,7 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
proto_ver = Ver}, proto_ver = Ver},
Bin, StrictMode) -> Bin, StrictMode) ->
{Props, Rest} = parse_properties(Bin, Ver, StrictMode), {Props, Rest} = parse_properties(Bin, Ver, StrictMode),
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Topic, Rest1} = parse_topic_name(Rest, StrictMode),
{Payload, Rest2} = parse_binary_data(Rest1), {Payload, Rest2} = parse_binary_data(Rest1),
{Packet#mqtt_packet_connect{will_props = Props, {Packet#mqtt_packet_connect{will_props = Props,
will_topic = Topic, will_topic = Topic,
@ -524,6 +524,14 @@ parse_binary_data(Bin)
when 2 > byte_size(Bin) -> when 2 > byte_size(Bin) ->
error(malformed_binary_data_length). error(malformed_binary_data_length).
parse_topic_name(Bin, false) ->
parse_utf8_string(Bin, false);
parse_topic_name(Bin, true) ->
case parse_utf8_string(Bin, true) of
{<<>>, _Rest} -> error(empty_topic_name);
Result -> Result
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Serialize MQTT Packet %% Serialize MQTT Packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -32,6 +32,8 @@
, add/3 , add/3
, add/4 , add/4
, put/2 , put/2
, put/3
, put/4
, del/2 , del/2
, run/2 , run/2
, run_fold/3 , run_fold/3
@ -75,6 +77,8 @@
priority :: integer() priority :: integer()
}). }).
-type callback() :: #callback{}.
-record(hook, { -record(hook, {
name :: hookpoint(), name :: hookpoint(),
callbacks :: list(#callback{}) callbacks :: list(#callback{})
@ -110,7 +114,7 @@ callback_priority(#callback{priority= P}) -> P.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Register a callback %% @doc Register a callback
-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). -spec(add(hookpoint(), action() | callback()) -> ok_or_error(already_exists)).
add(HookPoint, Callback) when is_record(Callback, callback) -> add(HookPoint, Callback) when is_record(Callback, callback) ->
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
@ -131,12 +135,24 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Like add/2, it register a callback, discard 'already_exists' error. %% @doc Like add/2, it register a callback, discard 'already_exists' error.
-spec(put(hookpoint(), action() | #callback{}) -> ok). -spec put(hookpoint(), action() | callback()) -> ok.
put(HookPoint, Callback) -> put(HookPoint, Callback) when is_record(Callback, callback) ->
case add(HookPoint, Callback) of case add(HookPoint, Callback) of
ok -> ok; ok -> ok;
{error, already_exists} -> ok {error, already_exists} -> ok
end. end;
put(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = 0}).
-spec put(hookpoint(), action(), filter() | integer() | list()) -> ok.
put(HookPoint, Action, {_M, _F, _A} = Filter) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
put(HookPoint, Action, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}).
-spec put(hookpoint(), action(), filter(), integer()) -> ok.
put(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Unregister a callback. %% @doc Unregister a callback.
-spec(del(hookpoint(), action() | {module(), atom()}) -> ok). -spec(del(hookpoint(), action() | {module(), atom()}) -> ok).
@ -205,7 +221,7 @@ execute({M, F, A}, Args) ->
erlang:apply(M, F, Args ++ A). erlang:apply(M, F, Args ++ A).
%% @doc Lookup callbacks. %% @doc Lookup callbacks.
-spec(lookup(hookpoint()) -> [#callback{}]). -spec(lookup(hookpoint()) -> [callback()]).
lookup(HookPoint) -> lookup(HookPoint) ->
case ets:lookup(?TAB, HookPoint) of case ets:lookup(?TAB, HookPoint) of
[#hook{callbacks = Callbacks}] -> [#hook{callbacks = Callbacks}] ->
@ -288,4 +304,3 @@ del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) ->
del_callback(Func, Callbacks, Acc); del_callback(Func, Callbacks, Acc);
del_callback(Action, [Callback | Callbacks], Acc) -> del_callback(Action, [Callback | Callbacks], Acc) ->
del_callback(Action, Callbacks, [Callback | Acc]). del_callback(Action, Callbacks, [Callback | Acc]).

View File

@ -23,7 +23,8 @@
, init/4 %% XXX: Compatible with before 4.2 version , init/4 %% XXX: Compatible with before 4.2 version
, info/1 , info/1
, check/2 , check/2
, update_overall_limiter/4 , update_overall_limiter/3
, delete_overall_limiter/1
]). ]).
-record(limiter, { -record(limiter, {
@ -154,14 +155,18 @@ is_message_limiter(conn_messages_routing) -> true;
is_message_limiter(overall_messages_routing) -> true; is_message_limiter(overall_messages_routing) -> true;
is_message_limiter(_) -> false. is_message_limiter(_) -> false.
update_overall_limiter(Zone, Name, Capacity, Interval) -> update_overall_limiter(Zone, Capacity, Interval) ->
case is_overall_limiter(Name) of
false -> false;
_ ->
try try
esockd_limiter:update({Zone, Name}, Capacity, Interval), esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval),
true true
catch _:_:_ -> catch _:_ ->
false
end.
delete_overall_limiter(Zone) ->
try
esockd_limiter:delete({Zone, overall_messages_routing}),
true
catch _:_ ->
false false
end
end. end.

View File

@ -29,6 +29,7 @@
-export([ start_listener/1 -export([ start_listener/1
, start_listener/3 , start_listener/3
, stop_listener/1 , stop_listener/1
, update_listeners_env/2
, restart_listener/1 , restart_listener/1
, restart_listener/3 , restart_listener/3
]). ]).
@ -187,6 +188,20 @@ with_port(Port, Opts = #{socket_opts := SocketOption}) when is_integer(Port) ->
with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) -> with_port({Addr, Port}, Opts = #{socket_opts := SocketOption}) ->
Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}. Opts#{socket_opts => [{ip, Addr}, {port, Port}| SocketOption]}.
update_listeners_env(Action, NewConf = #{name := NewName, proto := NewProto}) ->
Listener = emqx:get_env(listeners, []),
Listener1 = lists:filter(
fun(#{name := Name, proto := Proto}) ->
not (Name =:= NewName andalso Proto =:= NewProto)
end, Listener),
Listener2 =
case Action of
update -> [NewConf | Listener1];
delete -> Listener1
end,
application:set_env(emqx, listeners, Listener2),
ok.
%% @doc Restart all listeners %% @doc Restart all listeners
-spec(restart() -> ok). -spec(restart() -> ok).
restart() -> restart() ->

View File

@ -47,7 +47,7 @@
]). ]).
%% for testing %% for testing
-export([subscribers/2]). -export([subscribers/2, ack_enabled/0]).
%% gen_server callbacks %% gen_server callbacks
-export([ init/1 -export([ init/1

View File

@ -162,6 +162,14 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
t_parse_empty_topic_name(_) ->
Packet = <<48, 4, 0, 0, 0, 1>>,
NormalState = emqx_frame:initial_parse_state(#{strict_mode => false}),
?assertMatch({_, _}, emqx_frame:parse(Packet, NormalState)),
StrictState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?catch_error(empty_topic_name, emqx_frame:parse(Packet, StrictState)).
t_parse_frame_proxy_protocol(_) -> t_parse_frame_proxy_protocol(_) ->
BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">>
, <<"\r\n\r\n\0\r\nQUIT\n">>], , <<"\r\n\r\n\0\r\nQUIT\n">>],