Merge pull request #7425 from emqx/dev/v4.3.14

dev/v4.3.14 -> main-v4.3
This commit is contained in:
Zaiming (Stone) Shi 2022-04-03 18:06:14 +02:00 committed by GitHub
commit d49c8b61e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 528 additions and 166 deletions

View File

@ -10,6 +10,22 @@ File format:
- One list item per change topic - One list item per change topic
Change log ends with a list of github PRs Change log ends with a list of github PRs
## v4.3.14
### Enhancements
* In order to fix the execution order of exhook, e.g. before/after other plugins/modules,
ExHook now supports user customizing emqx_hook execute priority.
* add api: PUT /rules/{id}/reset_metrics.
This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474]
* Enhanced rule engine error handling when json parsing error.
### Bug fixes
* Prohibit empty topics in strict mode
* Make sure ehttpc delete useless pool always succeed.
* Update mongodb driver to fix potential process leak.
## v4.3.13 ## v4.3.13
### Important changes ### Important changes

View File

@ -99,6 +99,7 @@ $(PROFILES:%=clean-%):
.PHONY: clean-all .PHONY: clean-all
clean-all: clean-all:
@rm -rf _build @rm -rf _build
@rm rebar.lock
.PHONY: deps-all .PHONY: deps-all
deps-all: $(REBAR) $(PROFILES:%=deps-%) deps-all: $(REBAR) $(PROFILES:%=deps-%)

View File

@ -1,6 +1,6 @@
{application, emqx_auth_jwt, {application, emqx_auth_jwt,
[{description, "EMQ X Authentication with JWT"}, [{description, "EMQ X Authentication with JWT"},
{vsn, "4.3.1"}, % strict semver, bump manually! {vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_auth_jwt_sup]}, {registered, [emqx_auth_jwt_sup]},
{applications, [kernel,stdlib,jose]}, {applications, [kernel,stdlib,jose]},

View File

@ -1,13 +1,13 @@
%% -*-: erlang -*- %% -*-: erlang -*-
{VSN, {VSN,
[ [
{"4.3.0", [ {<<"4\\.3\\.[0-1]">>, [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ [
{"4.3.0", [ {<<"4\\.3\\.[0-1]">>, [
{load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []}
]}, ]},
{<<".*">>, []} {<<".*">>, []}

View File

@ -91,7 +91,7 @@ do_init_jwks(Options) ->
[K, V, Reason]), [K, V, Reason]),
undefined; undefined;
J -> J J -> J
catch T:R:_ -> catch T:R ->
?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n", ?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n",
[K, V, T, R]), [K, V, T, R]),
undefined undefined

View File

@ -1,7 +1,6 @@
{deps, {deps,
%% NOTE: mind poolboy version when updating mongodb-erlang version %% NOTE: mind poolboy version when updating mongodb-erlang version
[{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.12"}}}, [%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git
%% (which has overflow_ttl feature added). %% (which has overflow_ttl feature added).
%% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07).
%% By accident, We have always been using the upstream fork due to %% By accident, We have always been using the upstream fork due to
@ -29,4 +28,3 @@
{cover_enabled, true}. {cover_enabled, true}.
{cover_opts, [verbose]}. {cover_opts, [verbose]}.
{cover_export_enabled, true}. {cover_export_enabled, true}.

View File

@ -24,6 +24,17 @@
## Value: false | Duration ## Value: false | Duration
#exhook.auto_reconnect = 60s #exhook.auto_reconnect = 60s
## The exhook execution priority on the Chain of the emqx hooks.
##
## Modify the field to fix the exhook execute order before/after other plugins/modules.
## By default, most hooks registered by plugins or modules have a priority of 0.
##
## With the same priority of 0, the execute order depends on hookpoints mount order.
## Scilicet is the loaded order of plugins/ modules.
##
## Default: 0
## Value: Integer
#exhook.hook_priority = 0
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## The Hook callback servers ## The Hook callback servers

View File

@ -41,4 +41,6 @@
, {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} , {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}}
]). ]).
-define(DEFAULT_HOOK_PRIORITY, 0).
-endif. -endif.

View File

@ -15,6 +15,11 @@
{datatype, string} {datatype, string}
]}. ]}.
{mapping, "exhook.hook_priority", "emqx_exhook.hook_priority", [
{default, 0},
{datatype, integer}
]}.
{translation, "emqx_exhook.auto_reconnect", fun(Conf) -> {translation, "emqx_exhook.auto_reconnect", fun(Conf) ->
case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of
"false" -> false; "false" -> false;

View File

@ -1,6 +1,6 @@
{application, emqx_exhook, {application, emqx_exhook,
[{description, "EMQ X Extension for Hook"}, [{description, "EMQ X Extension for Hook"},
{vsn, "4.3.4"}, {vsn, "4.3.5"},
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{mod, {emqx_exhook_app, []}}, {mod, {emqx_exhook_app, []}},

View File

@ -1,13 +1,23 @@
%% -*-: erlang -*- %% -*- mode: erlang -*-
{VSN, {VSN,
[ [
{<<"4.3.[0-3]">>, [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]},
{load_module, emqx_exhook_server, brutal_purge,soft_purge,[]},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]},
{<<"4\\.3\\.[0-3]">>, [
{restart_application, emqx_exhook} {restart_application, emqx_exhook}
]}, ]},
{<<".*">>, []} {<<".*">>, []}
], ],
[ [
{<<"4.3.[0-3]">>, [ {"4.3.4", [
{load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]},
{load_module, emqx_exhook_server, brutal_purge,soft_purge,[]},
{update, emqx_exhook_mngr, {advanced, ["4.3.4"]}}
]},
{<<"4\\.3\\.[0-3]">>, [
{restart_application, emqx_exhook} {restart_application, emqx_exhook}
]}, ]},
{<<".*">>, []} {<<".*">>, []}

View File

@ -23,7 +23,7 @@
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
%% APIs %% APIs
-export([start_link/3]). -export([start_link/4]).
%% Mgmt API %% Mgmt API
-export([ enable/2 -export([ enable/2
@ -59,9 +59,14 @@
%% Request options %% Request options
request_options :: grpc_client:options(), request_options :: grpc_client:options(),
%% Timer references %% Timer references
trefs :: map() trefs :: map(),
%% Hooks execute options
hooks_options :: hooks_options()
}). }).
-export_type([ server_options/0
, hooks_options/0]).
-type servers() :: [{Name :: atom(), server_options()}]. -type servers() :: [{Name :: atom(), server_options()}].
-type server_options() :: [ {scheme, http | https} -type server_options() :: [ {scheme, http | https}
@ -69,6 +74,10 @@
| {port, inet:port_number()} | {port, inet:port_number()}
]. ].
-type hooks_options() :: #{hook_priority => integer()}.
-define(DEFAULT_HOOK_OPTS, #{hook_priority => ?DEFAULT_HOOK_PRIORITY}).
-define(DEFAULT_TIMEOUT, 60000). -define(DEFAULT_TIMEOUT, 60000).
-define(CNTER, emqx_exhook_counter). -define(CNTER, emqx_exhook_counter).
@ -77,12 +86,12 @@
%% APIs %% APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) -spec start_link(servers(), false | non_neg_integer(), grpc_client:options(), hooks_options())
->ignore ->ignore
| {ok, pid()} | {ok, pid()}
| {error, any()}. | {error, any()}.
start_link(Servers, AutoReconnect, ReqOpts) -> start_link(Servers, AutoReconnect, ReqOpts, HooksOpts) ->
gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts, HooksOpts], []).
-spec enable(pid(), atom()|string()) -> ok | {error, term()}. -spec enable(pid(), atom()|string()) -> ok | {error, term()}.
enable(Pid, Name) -> enable(Pid, Name) ->
@ -102,7 +111,7 @@ call(Pid, Req) ->
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Servers, AutoReconnect, ReqOpts0]) -> init([Servers, AutoReconnect, ReqOpts0, HooksOpts]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
%% XXX: Due to the ExHook Module in the enterprise, %% XXX: Due to the ExHook Module in the enterprise,
%% this process may start multiple times and they will share this table %% this process may start multiple times and they will share this table
@ -120,32 +129,33 @@ init([Servers, AutoReconnect, ReqOpts0]) ->
%% Load the hook servers %% Load the hook servers
ReqOpts = maps:without([request_failed_action], ReqOpts0), ReqOpts = maps:without([request_failed_action], ReqOpts0),
{Waiting, Running} = load_all_servers(Servers, ReqOpts), {Waiting, Running} = load_all_servers(Servers, ReqOpts, HooksOpts),
{ok, ensure_reload_timer( {ok, ensure_reload_timer(
#state{waiting = Waiting, #state{waiting = Waiting,
running = Running, running = Running,
stopped = #{}, stopped = #{},
request_options = ReqOpts, request_options = ReqOpts,
auto_reconnect = AutoReconnect, auto_reconnect = AutoReconnect,
trefs = #{} trefs = #{},
hooks_options = HooksOpts
} }
)}. )}.
%% @private %% @private
load_all_servers(Servers, ReqOpts) -> load_all_servers(Servers, ReqOpts, HooksOpts) ->
load_all_servers(Servers, ReqOpts, #{}, #{}). load_all_servers(Servers, ReqOpts, HooksOpts, #{}, #{}).
load_all_servers([], _Request, Waiting, Running) -> load_all_servers([], _Request, _HooksOpts, Waiting, Running) ->
{Waiting, Running}; {Waiting, Running};
load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> load_all_servers([{Name, Options} | More], ReqOpts, HooksOpts, Waiting, Running) ->
{NWaiting, NRunning} = {NWaiting, NRunning} =
case emqx_exhook_server:load(Name, Options, ReqOpts) of case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
{Waiting, Running#{Name => Options}}; {Waiting, Running#{Name => Options}};
{error, _} -> {error, _} ->
{Waiting#{Name => Options}, Running} {Waiting#{Name => Options}, Running}
end, end,
load_all_servers(More, ReqOpts, NWaiting, NRunning). load_all_servers(More, ReqOpts, HooksOpts, NWaiting, NRunning).
handle_call({load, Name}, _From, State) -> handle_call({load, Name}, _From, State) ->
{Result, NState} = do_load_server(Name, State), {Result, NState} = do_load_server(Name, State),
@ -199,8 +209,27 @@ terminate(_Reason, State = #state{running = Running}) ->
_ = unload_exhooks(), _ = unload_exhooks(),
ok. ok.
code_change(_OldVsn, State, _Extra) -> %% in the emqx_exhook:v4.3.5, we have added one new field in the state last:
{ok, State}. %% - hooks_options :: map()
code_change({down, _Vsn}, State, [ToVsn]) ->
case re:run(ToVsn, "4\\.3\\.[0-4]") of
{match, _} ->
NState = list_to_tuple(
lists:droplast(
tuple_to_list(State))),
{ok, NState};
_ ->
{ok, State}
end;
code_change(_Vsn, State, [FromVsn]) ->
case re:run(FromVsn, "4\\.3\\.[0-4]") of
{match, _} ->
NState = list_to_tuple(
tuple_to_list(State) ++ [?DEFAULT_HOOK_OPTS]),
{ok, NState};
_ ->
{ok, State}
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal funcs %% Internal funcs
@ -214,7 +243,8 @@ do_load_server(Name, State0 = #state{
waiting = Waiting, waiting = Waiting,
running = Running, running = Running,
stopped = Stopped, stopped = Stopped,
request_options = ReqOpts}) -> request_options = ReqOpts,
hooks_options = HooksOpts}) ->
State = clean_reload_timer(Name, State0), State = clean_reload_timer(Name, State0),
case maps:get(Name, Running, undefined) of case maps:get(Name, Running, undefined) of
undefined -> undefined ->
@ -223,7 +253,7 @@ do_load_server(Name, State0 = #state{
undefined -> undefined ->
{{error, not_found}, State}; {{error, not_found}, State};
Options -> Options ->
case emqx_exhook_server:load(Name, Options, ReqOpts) of case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of
{ok, ServerState} -> {ok, ServerState} ->
save(Name, ServerState), save(Name, ServerState),
?LOG(info, "Load exhook callback server " ?LOG(info, "Load exhook callback server "

View File

@ -25,7 +25,7 @@
-define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client).
%% Load/Unload %% Load/Unload
-export([ load/3 -export([ load/4
, unload/1 , unload/1
]). ]).
@ -81,8 +81,9 @@
%% Load/Unload APIs %% Load/Unload APIs
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} . -spec load(atom(), emqx_exhook_mngr:server_options(), grpc_client:options(), emqx_exhook_mngr:hooks_options())
load(Name0, Opts0, ReqOpts) -> -> {ok, server()} | {error, term()} .
load(Name0, Opts0, ReqOpts, HooksOpts) ->
Name = to_list(Name0), Name = to_list(Name0),
{SvrAddr, ClientOpts} = channel_opts(Opts0), {SvrAddr, ClientOpts} = channel_opts(Opts0),
case emqx_exhook_sup:start_grpc_client_channel( case emqx_exhook_sup:start_grpc_client_channel(
@ -97,7 +98,7 @@ load(Name0, Opts0, ReqOpts) ->
io_lib:format("exhook.~s.", [Name])), io_lib:format("exhook.~s.", [Name])),
ensure_metrics(Prefix, HookSpecs), ensure_metrics(Prefix, HookSpecs),
%% Ensure hooks %% Ensure hooks
ensure_hooks(HookSpecs), ensure_hooks(HookSpecs, maps:get(hook_priority, HooksOpts, ?DEFAULT_HOOK_PRIORITY)),
{ok, #server{name = Name, {ok, #server{name = Name,
options = ReqOpts, options = ReqOpts,
channel = _ChannPoolPid, channel = _ChannPoolPid,
@ -174,7 +175,7 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) ->
case maps:get(name, HookSpec, undefined) of case maps:get(name, HookSpec, undefined) of
undefined -> Acc; undefined -> Acc;
Name0 -> Name0 ->
Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, Name = try binary_to_existing_atom(Name0, utf8) catch T:R -> {T,R} end,
case lists:member(Name, AvailableHooks) of case lists:member(Name, AvailableHooks) of
true -> true ->
case lists:member(Name, MessageHooks) of case lists:member(Name, MessageHooks) of
@ -193,13 +194,13 @@ ensure_metrics(Prefix, HookSpecs) ->
|| Hookpoint <- maps:keys(HookSpecs)], || Hookpoint <- maps:keys(HookSpecs)],
lists:foreach(fun emqx_metrics:ensure/1, Keys). lists:foreach(fun emqx_metrics:ensure/1, Keys).
ensure_hooks(HookSpecs) -> ensure_hooks(HookSpecs, Priority) ->
lists:foreach(fun(Hookpoint) -> lists:foreach(fun(Hookpoint) ->
case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of
false -> false ->
?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]); ?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]);
{Hookpoint, {M, F, A}} -> {Hookpoint, {M, F, A}} ->
emqx_hooks:put(Hookpoint, {M, F, A}), emqx_hooks:put(Hookpoint, {M, F, A}, Priority),
ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0})
end end
end, maps:keys(HookSpecs)). end, maps:keys(HookSpecs)).

View File

@ -18,6 +18,8 @@
-behaviour(supervisor). -behaviour(supervisor).
-include("emqx_exhook.hrl").
-export([ start_link/0 -export([ start_link/0
, init/1 , init/1
]). ]).
@ -43,7 +45,7 @@ start_link() ->
init([]) -> init([]) ->
Mngr = ?CHILD(emqx_exhook_mngr, worker, Mngr = ?CHILD(emqx_exhook_mngr, worker,
[servers(), auto_reconnect(), request_options()]), [servers(), auto_reconnect(), request_options(), hooks_options()]),
{ok, {{one_for_one, 10, 100}, [Mngr]}}. {ok, {{one_for_one, 10, 100}, [Mngr]}}.
servers() -> servers() ->
@ -57,6 +59,10 @@ request_options() ->
request_failed_action => env(request_failed_action, deny) request_failed_action => env(request_failed_action, deny)
}. }.
hooks_options() ->
#{hook_priority => env(hook_priority, ?DEFAULT_HOOK_PRIORITY)
}.
env(Key, Def) -> env(Key, Def) ->
application:get_env(emqx_exhook, Key, Def). application:get_env(emqx_exhook, Key, Def).

View File

@ -98,10 +98,31 @@ t_cli_stats(_) ->
_ = emqx_exhook_cli:cli(x), _ = emqx_exhook_cli:cli(x),
unmeck_print(). unmeck_print().
t_priority(_) ->
restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]),
emqx_exhook:disable(default),
ok = emqx_exhook:enable(default),
[Callback | _] = emqx_hooks:lookup('client.connected'),
1 = emqx_hooks:callback_priority(Callback).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Utils %% Utils
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% TODO: make it more general and move to `emqx_ct_helpers`
restart_exhook_with_envs(Envs) ->
emqx_ct_helpers:stop_apps([emqx_exhook]),
SetPriorityFun
= fun(emqx) ->
set_special_cfgs(emqx);
(emqx_exhook) ->
lists:foreach(fun({App, Key, Val}) ->
application:set_env(App, Key, Val)
end, Envs)
end,
emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun).
meck_print() -> meck_print() ->
meck:new(emqx_ctl, [passthrough, no_history, no_link]), meck:new(emqx_ctl, [passthrough, no_history, no_link]),
meck:expect(emqx_ctl, print, fun(_) -> ok end), meck:expect(emqx_ctl, print, fun(_) -> ok end),

View File

@ -1,6 +1,6 @@
{application, emqx_rule_engine, {application, emqx_rule_engine,
[{description, "EMQ X Rule Engine"}, [{description, "EMQ X Rule Engine"},
{vsn, "4.3.8"}, % strict semver, bump manually! {vsn, "4.3.9"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]},
{applications, [kernel,stdlib,rulesql,getopt]}, {applications, [kernel,stdlib,rulesql,getopt]},

View File

@ -1,14 +1,28 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}},
@ -17,7 +31,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -26,7 +41,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -35,7 +51,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -45,7 +62,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -56,7 +74,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -67,7 +86,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
@ -78,14 +98,28 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.7", [{"4.3.8",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}},
@ -94,7 +128,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -103,7 +138,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -112,7 +148,8 @@
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -122,7 +159,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -133,7 +171,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
@ -144,7 +183,8 @@
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},

View File

@ -37,6 +37,7 @@
, test_resource/1 , test_resource/1
, start_resource/1 , start_resource/1
, get_resource_status/1 , get_resource_status/1
, is_source_alive/1
, get_resource_params/1 , get_resource_params/1
, delete_resource/1 , delete_resource/1
, update_resource/2 , update_resource/2
@ -314,24 +315,37 @@ start_resource(ResId) ->
end. end.
-spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}).
test_resource(#{type := Type, config := Config0}) -> test_resource(#{type := Type} = Params) ->
case emqx_rule_registry:find_resource_type(Type) of case emqx_rule_registry:find_resource_type(Type) of
{ok, #resource_type{on_create = {ModC, Create}, {ok, #resource_type{}} ->
on_destroy = {ModD, Destroy}, ResId = maps:get(id, Params, resource_id()),
params_spec = ParamSpec}} ->
Config = emqx_rule_validator:validate_params(Config0, ParamSpec),
ResId = resource_id(),
try try
_ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), _ = create_resource(maps:put(id, ResId, Params)),
_ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), true = is_source_alive(ResId),
ok ok
catch catch E:R:S ->
throw:Reason -> {error, Reason} ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]),
{error, R}
after
_ = ?CLUSTER_CALL(delete_resource, [ResId])
end; end;
not_found -> not_found ->
{error, {resource_type_not_found, Type}} {error, {resource_type_not_found, Type}}
end. end.
is_source_alive(ResId) ->
case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of
{ResL, []} ->
is_source_alive_(ResL);
{_, _Errors} ->
false
end.
is_source_alive_([]) -> true;
is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL);
is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false;
is_source_alive_([_Error | _ResL]) -> false.
-spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}).
get_resource_status(ResId) -> get_resource_status(ResId) ->
case emqx_rule_registry:find_resource_params(ResId) of case emqx_rule_registry:find_resource_params(ResId) of

View File

@ -58,6 +58,13 @@
descr => "Delete a rule" descr => "Delete a rule"
}). }).
-rest_api(#{name => reset_metrics,
method => 'PUT',
path => "/rules/:bin:id/reset_metrics",
func => reset_metrics,
descr => "reset a rule metrics"
}).
-rest_api(#{name => list_actions, -rest_api(#{name => list_actions,
method => 'GET', method => 'GET',
path => "/actions/", path => "/actions/",
@ -154,6 +161,7 @@
, list_rules/2 , list_rules/2
, show_rule/2 , show_rule/2
, delete_rule/2 , delete_rule/2
, reset_metrics/2
]). ]).
-export([ list_actions/2 -export([ list_actions/2
@ -252,6 +260,10 @@ delete_rule(#{id := Id}, _Params) ->
ok = emqx_rule_engine:delete_rule(Id), ok = emqx_rule_engine:delete_rule(Id),
return(ok). return(ok).
reset_metrics(#{id := Id}, _Params) ->
ok = emqx_rule_metrics:reset_metrics(Id),
return(ok).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Actions API %% Actions API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -296,7 +308,7 @@ do_create_resource(Create, ParsedParams) ->
list_resources(#{}, _Params) -> list_resources(#{}, _Params) ->
Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()),
Data = lists:map(fun(Res = #{id := ResId}) -> Data = lists:map(fun(Res = #{id := ResId}) ->
Status = get_aggregated_status(ResId), Status = emqx_rule_engine:is_source_alive(ResId),
maps:put(status, Status, Res) maps:put(status, Status, Res)
end, Data0), end, Data0),
return({ok, Data}). return({ok, Data}).
@ -304,14 +316,6 @@ list_resources(#{}, _Params) ->
list_resources_by_type(#{type := Type}, _Params) -> list_resources_by_type(#{type := Type}, _Params) ->
return_all(emqx_rule_registry:get_resources_by_type(Type)). return_all(emqx_rule_registry:get_resources_by_type(Type)).
get_aggregated_status(ResId) ->
lists:all(fun(Node) ->
case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of
{ok, #{is_alive := true}} -> true;
_ -> false
end
end, ekka_mnesia:running_nodes()).
show_resource(#{id := Id}, _Params) -> show_resource(#{id := Id}, _Params) ->
case emqx_rule_registry:find_resource(Id) of case emqx_rule_registry:find_resource(Id) of
{ok, R} -> {ok, R} ->

View File

@ -172,18 +172,18 @@ eventmsg_connected(_ClientInfo = #{
is_bridge := IsBridge, is_bridge := IsBridge,
mountpoint := Mountpoint mountpoint := Mountpoint
}, },
_ConnInfo = #{ ConnInfo = #{
peername := PeerName, peername := PeerName,
sockname := SockName, sockname := SockName,
clean_start := CleanStart, clean_start := CleanStart,
proto_name := ProtoName, proto_name := ProtoName,
proto_ver := ProtoVer, proto_ver := ProtoVer,
keepalive := Keepalive,
connected_at := ConnectedAt, connected_at := ConnectedAt,
conn_props := ConnProps, receive_maximum := RcvMax
receive_maximum := RcvMax,
expiry_interval := ExpiryInterval
}) -> }) ->
Keepalive = maps:get(keepalive, ConnInfo, 0),
ConnProps = maps:get(conn_props, ConnInfo, #{}),
ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
with_basic_columns('client.connected', with_basic_columns('client.connected',
#{clientid => ClientId, #{clientid => ClientId,
username => Username, username => Username,

View File

@ -197,6 +197,9 @@
, rfc3339_to_unix_ts/2 , rfc3339_to_unix_ts/2
, now_timestamp/0 , now_timestamp/0
, now_timestamp/1 , now_timestamp/1
, mongo_date/0
, mongo_date/1
, mongo_date/2
]). ]).
%% Proc Dict Func %% Proc Dict Func
@ -900,6 +903,24 @@ time_unit(<<"millisecond">>) -> millisecond;
time_unit(<<"microsecond">>) -> microsecond; time_unit(<<"microsecond">>) -> microsecond;
time_unit(<<"nanosecond">>) -> nanosecond. time_unit(<<"nanosecond">>) -> nanosecond.
mongo_date() ->
erlang:timestamp().
mongo_date(MillisecondsTimestamp) ->
convert_timestamp(MillisecondsTimestamp).
mongo_date(Timestamp, Unit) ->
InsertedTimeUnit = time_unit(Unit),
ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond),
convert_timestamp(ScaledEpoch).
convert_timestamp(MillisecondsTimestamp) ->
MicroTimestamp = MillisecondsTimestamp * 1000,
MegaSecs = MicroTimestamp div 1000_000_000_000,
Secs = MicroTimestamp div 1000_000 - MegaSecs*1000_000,
MicroSecs = MicroTimestamp rem 1000_000,
{MegaSecs, Secs, MicroSecs}.
%% @doc This is for sql funcs that should be handled in the specific modules. %% @doc This is for sql funcs that should be handled in the specific modules.
%% Here the emqx_rule_funcs module acts as a proxy, forwarding %% Here the emqx_rule_funcs module acts as a proxy, forwarding
%% the function handling to the worker module. %% the function handling to the worker module.

View File

@ -63,6 +63,7 @@
, create_metrics/1 , create_metrics/1
, clear_rule_metrics/1 , clear_rule_metrics/1
, clear_metrics/1 , clear_metrics/1
, reset_metrics/1
]). ]).
-export([ get_rule_metrics/1 -export([ get_rule_metrics/1
@ -127,6 +128,45 @@ clear_rule_metrics(Id) ->
clear_metrics(Id) -> clear_metrics(Id) ->
gen_server:call(?MODULE, {delete_metrics, Id}). gen_server:call(?MODULE, {delete_metrics, Id}).
-spec(reset_metrics(rule_id()) -> ok).
reset_metrics(Id) ->
reset_speeds(Id),
reset_metrics(Id, rule_metrics()),
case emqx_rule_registry:get_rule(Id) of
not_found -> ok;
{ok, #rule{actions = Actions}} ->
[ reset_metrics(ActionId, action_metrics())
|| #action_instance{ id = ActionId} <- Actions],
ok
end.
reset_metrics(Id, Metrics) ->
case couters_ref(Id) of
not_found -> ok;
Ref -> [counters:put(Ref, metrics_idx(Idx), 0)
|| Idx <- Metrics],
ok
end.
reset_speeds(Id) ->
gen_server:call(?MODULE, {reset_speeds, Id}).
rule_metrics() ->
[ 'rules.matched'
, 'rules.failed'
, 'rules.passed'
, 'rules.exception'
, 'rules.no_result'
].
action_metrics() ->
[ 'actions.success'
, 'actions.error'
, 'actions.taken'
, 'actions.exception'
, 'actions.retry'
].
-spec(get(rule_id(), atom()) -> number()). -spec(get(rule_id(), atom()) -> number()).
get(Id, Metric) -> get(Id, Metric) ->
case couters_ref(Id) of case couters_ref(Id) of
@ -288,6 +328,9 @@ handle_call({create_rule_metrics, Id}, _From,
_ -> RuleSpeeds#{Id => #rule_speed{}} _ -> RuleSpeeds#{Id => #rule_speed{}}
end}}; end}};
handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) ->
{reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}};
handle_call({delete_metrics, Id}, _From, handle_call({delete_metrics, Id}, _From,
State = #state{metric_ids = MIDs, rule_speeds = undefined}) -> State = #state{metric_ids = MIDs, rule_speeds = undefined}) ->
{reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}}; {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}};

View File

@ -50,31 +50,8 @@ apply_rules([], _Input) ->
ok; ok;
apply_rules([#rule{enabled = false}|More], Input) -> apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input); apply_rules(More, Input);
apply_rules([Rule = #rule{id = RuleID}|More], Input) -> apply_rules([Rule|More], Input) ->
try apply_rule_discard_result(Rule, Input) apply_rule_discard_result(Rule, Input),
catch
%% ignore the errors if select or match failed
_:{select_and_transform_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{match_conditions_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{select_and_collect_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
[RuleID, Error]);
_:{match_incase_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
[RuleID, Error]);
_:Error:StkTrace ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleID, Error, StkTrace])
end,
apply_rules(More, Input). apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) -> apply_rule_discard_result(Rule, Input) ->
@ -84,7 +61,35 @@ apply_rule_discard_result(Rule, Input) ->
apply_rule(Rule = #rule{id = RuleID}, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) ->
clear_rule_payload(), clear_rule_payload(),
ok = emqx_rule_metrics:inc_rules_matched(RuleID), ok = emqx_rule_metrics:inc_rules_matched(RuleID),
do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID}))
catch
%% ignore the errors if select or match failed
_:Reason = {select_and_transform_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "SELECT clause exception for ~s failed: ~p",
[RuleID, Error]),
{error, Reason};
_:Reason = {match_conditions_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "WHERE clause exception for ~s failed: ~p",
[RuleID, Error]),
{error, Reason};
_:Reason = {select_and_collect_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "FOREACH clause exception for ~s failed: ~p",
[RuleID, Error]),
{error, Reason};
_:Reason = {match_incase_error, Error} ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(warning, "INCASE clause exception for ~s failed: ~p",
[RuleID, Error]),
{error, Reason};
_:Error:StkTrace ->
emqx_rule_metrics:inc_rules_exception(RuleID),
?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p",
[RuleID, Error, StkTrace]),
{error, {Error, StkTrace}}
end.
do_apply_rule(#rule{id = RuleId, do_apply_rule(#rule{id = RuleId,
is_foreach = true, is_foreach = true,
@ -452,7 +457,8 @@ cache_payload(DecodedP) ->
safe_decode_and_cache(MaybeJson) -> safe_decode_and_cache(MaybeJson) ->
try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) try cache_payload(emqx_json:decode(MaybeJson, [return_maps]))
catch _:_ -> #{} catch
_:_:_-> error({decode_json_failed, MaybeJson})
end. end.
ensure_list(List) when is_list(List) -> List; ensure_list(List) when is_list(List) -> List;

View File

@ -77,7 +77,7 @@ test_rule(Sql, Select, Context, EventTopics) ->
R R
of of
{ok, Data} -> {ok, flatten(Data)}; {ok, Data} -> {ok, flatten(Data)};
{error, nomatch} -> {error, nomatch} {error, Reason} -> {error, Reason}
after after
ok = emqx_rule_registry:remove_action_instance_params(ActInstId) ok = emqx_rule_registry:remove_action_instance_params(ActInstId)
end. end.

View File

@ -49,6 +49,7 @@ groups() ->
[t_register_provider, [t_register_provider,
t_unregister_provider, t_unregister_provider,
t_create_rule, t_create_rule,
t_reset_metrics,
t_create_resource t_create_resource
]}, ]},
{actions, [], {actions, [],
@ -124,7 +125,8 @@ groups() ->
t_sqlparse_array_range_1, t_sqlparse_array_range_1,
t_sqlparse_array_range_2, t_sqlparse_array_range_2,
t_sqlparse_true_false, t_sqlparse_true_false,
t_sqlparse_new_map t_sqlparse_new_map,
t_sqlparse_invalid_json
]}, ]},
{rule_metrics, [], {rule_metrics, [],
[t_metrics, [t_metrics,
@ -351,6 +353,39 @@ t_inspect_action(_Config) ->
emqx_rule_registry:remove_resource(ResId), emqx_rule_registry:remove_resource(ResId),
ok. ok.
t_reset_metrics(_Config) ->
ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => built_in,
config => #{},
description => <<"debug resource">>}),
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
#{rawsql => "select clientid as c, username as u "
"from \"t1\" ",
actions => [#{name => 'inspect',
args => #{'$resource' => ResId, a=>1, b=>2}}],
type => built_in,
description => <<"Inspect rule">>
}),
{ok, Client} = emqtt:start_link([{username, <<"emqx">>}]),
{ok, _} = emqtt:connect(Client),
[ begin
emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0),
timer:sleep(100)
end
|| _ <- lists:seq(1,10)],
emqx_rule_metrics:reset_metrics(Id),
?assertEqual(#{exception => 0,failed => 0,
matched => 0,no_result => 0,passed => 0,
speed => 0.0,speed_last5m => 0.0,speed_max => 0},
emqx_rule_metrics:get_rule_metrics(Id)),
?assertEqual(#{failed => 0,success => 0,taken => 0},
emqx_rule_metrics:get_action_metrics(ResId)),
emqtt:stop(Client),
emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId),
ok.
t_republish_action(_Config) -> t_republish_action(_Config) ->
Qos0Received = emqx_metrics:val('messages.qos0.received'), Qos0Received = emqx_metrics:val('messages.qos0.received'),
Received = emqx_metrics:val('messages.received'), Received = emqx_metrics:val('messages.received'),
@ -2274,12 +2309,13 @@ t_sqlparse_array_range_1(_Config) ->
Sql02 = "select " Sql02 = "select "
" payload.a[1..4] as c " " payload.a[1..4] as c "
"from \"t/#\" ", "from \"t/#\" ",
?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}}, ?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}},
emqx_rule_sqltester:test( emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql02, #{<<"rawsql">> => Sql02,
<<"ctx">> => <<"ctx">> =>
#{<<"payload">> => <<"{\"x\":[0,1,2,3,4,5]}">>, #{<<"payload">> => <<"{\"x\":[0,1,2,3,4,5]}">>,
<<"topic">> => <<"t/a">>}})), <<"topic">> => <<"t/a">>}})),
%% construct a range: %% construct a range:
Sql1 = "select " Sql1 = "select "
" [1..4] as c, " " [1..4] as c, "
@ -2406,6 +2442,29 @@ t_sqlparse_nested_get(_Config) ->
<<"payload">> => <<"{\"a\": {\"b\": 0}}">> <<"payload">> => <<"{\"a\": {\"b\": 0}}">>
}})). }})).
t_sqlparse_invalid_json(_Config) ->
Sql02 = "select "
" payload.a[1..4] as c "
"from \"t/#\" ",
?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql02,
<<"ctx">> =>
#{<<"payload">> => <<"{\"x\":[0,1,2,3,}">>,
<<"topic">> => <<"t/a">>}})),
Sql2 = "foreach payload.sensors "
"do item.cmd as msg_type "
"from \"t/#\" ",
?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}},
emqx_rule_sqltester:test(
#{<<"rawsql">> => Sql2,
<<"ctx">> =>
#{<<"payload">> =>
<<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>,
<<"topic">> => <<"t/a">>}})).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Internal helpers %% Internal helpers
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -1,6 +1,6 @@
{application, emqx_dashboard, {application, emqx_dashboard,
[{description, "EMQ X Web Dashboard"}, [{description, "EMQ X Web Dashboard"},
{vsn, "4.3.9"}, % strict semver, bump manually! {vsn, "4.3.10"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_dashboard_sup]}, {registered, [emqx_dashboard_sup]},
{applications, [kernel,stdlib,mnesia,minirest]}, {applications, [kernel,stdlib,mnesia,minirest]},

View File

@ -37,7 +37,7 @@
{deps, {deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.14"}}} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.15"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
@ -57,6 +57,7 @@
, {getopt, "1.0.1"} , {getopt, "1.0.1"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}}
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.0"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.0"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
]}. ]}.
{xref_ignores, {xref_ignores,

View File

@ -55,6 +55,10 @@ while [ "$#" -gt 0 ]; do
SKIP_BUILD='yes' SKIP_BUILD='yes'
shift shift
;; ;;
--skip-build-base)
SKIP_BUILD_BASE='yes'
shift
;;
--check) --check)
# hijack the --check option # hijack the --check option
IS_CHECK='yes' IS_CHECK='yes'
@ -88,16 +92,20 @@ else
NEW_COPY='no' NEW_COPY='no'
fi fi
pushd "${PREV_DIR_BASE}/${PREV_TAG}" if [ "${SKIP_BUILD_BASE:-no}" = 'yes' ]; then
if [ "$NEW_COPY" = 'no' ]; then echo "not building relup base ${PREV_DIR_BASE}/${PREV_TAG}"
else
pushd "${PREV_DIR_BASE}/${PREV_TAG}"
if [ "$NEW_COPY" = 'no' ]; then
REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')" REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')"
git fetch "$REMOTE" git fetch "$REMOTE"
fi
git reset --hard
git clean -fdx
git checkout "${PREV_TAG}"
make "$PROFILE"
popd
fi fi
git reset --hard
git clean -fdx
git checkout "${PREV_TAG}"
make "$PROFILE"
popd
PREV_REL_DIR="${PREV_DIR_BASE}/${PREV_TAG}/_build/${PROFILE}/lib" PREV_REL_DIR="${PREV_DIR_BASE}/${PREV_TAG}/_build/${PROFILE}/lib"

View File

@ -6,7 +6,7 @@
%% the emqx `release' version, which in turn is comprised of several %% the emqx `release' version, which in turn is comprised of several
%% apps, one of which is this. See `emqx_release.hrl' for more %% apps, one of which is this. See `emqx_release.hrl' for more
%% info. %% info.
{vsn, "4.3.14"}, % strict semver, bump manually! {vsn, "4.3.15"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, []}, {registered, []},
{applications, [ kernel {applications, [ kernel

View File

@ -1,7 +1,10 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
%% Unless you know what you are doing, DO NOT edit manually!! %% Unless you know what you are doing, DO NOT edit manually!!
{VSN, {VSN,
[{"4.3.13", [{"4.3.14",
[{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.13",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, [{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
@ -18,7 +21,8 @@
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_misc,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -44,7 +48,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.11", {"4.3.11",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -72,7 +77,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.10", {"4.3.10",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -100,7 +106,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -132,7 +139,8 @@
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.8", {"4.3.8",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -164,7 +172,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.7", {"4.3.7",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -197,7 +206,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -230,7 +240,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -264,7 +275,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -298,7 +310,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -333,7 +346,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -368,7 +382,8 @@
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -406,7 +421,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -447,14 +463,18 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.13",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.14",
[{load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.13",
[{load_module,emqx_plugins,brutal_purge,soft_purge,[]},
{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
{load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]},
{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]},
@ -466,7 +486,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.12", {"4.3.12",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -491,7 +512,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.11", {"4.3.11",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -518,7 +540,8 @@
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.10", {"4.3.10",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -545,7 +568,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.9", {"4.3.9",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -576,7 +600,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.8", {"4.3.8",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -607,7 +632,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.7", {"4.3.7",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -639,7 +665,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.6", {"4.3.6",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -671,7 +698,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.5", {"4.3.5",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -704,7 +732,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.4", {"4.3.4",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -737,7 +766,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.3", {"4.3.3",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -771,7 +801,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.2", {"4.3.2",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -805,7 +836,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.1", {"4.3.1",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -842,7 +874,8 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{"4.3.0", {"4.3.0",
[{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, [{load_module,emqx_flapping,brutal_purge,soft_purge,[]},
{load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]},
@ -881,5 +914,6 @@
{load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx,brutal_purge,soft_purge,[]},
{load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]},
{load_module,emqx_message,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]},
{load_module,emqx_hooks,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -265,7 +265,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, <<AckFlags:8, ReasonCode:8, R
parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin, parse_packet(#mqtt_packet_header{type = ?PUBLISH, qos = QoS}, Bin,
#{strict_mode := StrictMode, version := Ver}) -> #{strict_mode := StrictMode, version := Ver}) ->
{TopicName, Rest} = parse_utf8_string(Bin, StrictMode), {TopicName, Rest} = parse_topic_name(Bin, StrictMode),
{PacketId, Rest1} = case QoS of {PacketId, Rest1} = case QoS of
?QOS_0 -> {undefined, Rest}; ?QOS_0 -> {undefined, Rest};
_ -> parse_packet_id(Rest) _ -> parse_packet_id(Rest)
@ -357,7 +357,7 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true,
proto_ver = Ver}, proto_ver = Ver},
Bin, StrictMode) -> Bin, StrictMode) ->
{Props, Rest} = parse_properties(Bin, Ver, StrictMode), {Props, Rest} = parse_properties(Bin, Ver, StrictMode),
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode), {Topic, Rest1} = parse_topic_name(Rest, StrictMode),
{Payload, Rest2} = parse_binary_data(Rest1), {Payload, Rest2} = parse_binary_data(Rest1),
{Packet#mqtt_packet_connect{will_props = Props, {Packet#mqtt_packet_connect{will_props = Props,
will_topic = Topic, will_topic = Topic,
@ -524,6 +524,14 @@ parse_binary_data(Bin)
when 2 > byte_size(Bin) -> when 2 > byte_size(Bin) ->
error(malformed_binary_data_length). error(malformed_binary_data_length).
parse_topic_name(Bin, false) ->
parse_utf8_string(Bin, false);
parse_topic_name(Bin, true) ->
case parse_utf8_string(Bin, true) of
{<<>>, _Rest} -> error(empty_topic_name);
Result -> Result
end.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Serialize MQTT Packet %% Serialize MQTT Packet
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -32,6 +32,8 @@
, add/3 , add/3
, add/4 , add/4
, put/2 , put/2
, put/3
, put/4
, del/2 , del/2
, run/2 , run/2
, run_fold/3 , run_fold/3
@ -75,6 +77,8 @@
priority :: integer() priority :: integer()
}). }).
-type callback() :: #callback{}.
-record(hook, { -record(hook, {
name :: hookpoint(), name :: hookpoint(),
callbacks :: list(#callback{}) callbacks :: list(#callback{})
@ -110,7 +114,7 @@ callback_priority(#callback{priority= P}) -> P.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Register a callback %% @doc Register a callback
-spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). -spec(add(hookpoint(), action() | callback()) -> ok_or_error(already_exists)).
add(HookPoint, Callback) when is_record(Callback, callback) -> add(HookPoint, Callback) when is_record(Callback, callback) ->
gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity);
add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> add(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
@ -131,12 +135,24 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Like add/2, it register a callback, discard 'already_exists' error. %% @doc Like add/2, it register a callback, discard 'already_exists' error.
-spec(put(hookpoint(), action() | #callback{}) -> ok). -spec put(hookpoint(), action() | callback()) -> ok.
put(HookPoint, Callback) -> put(HookPoint, Callback) when is_record(Callback, callback) ->
case add(HookPoint, Callback) of case add(HookPoint, Callback) of
ok -> ok; ok -> ok;
{error, already_exists} -> ok {error, already_exists} -> ok
end. end;
put(HookPoint, Action) when is_function(Action); is_tuple(Action) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = 0}).
-spec put(hookpoint(), action(), filter() | integer() | list()) -> ok.
put(HookPoint, Action, {_M, _F, _A} = Filter) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0});
put(HookPoint, Action, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}).
-spec put(hookpoint(), action(), filter(), integer()) -> ok.
put(HookPoint, Action, Filter, Priority) when is_integer(Priority) ->
?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
%% @doc Unregister a callback. %% @doc Unregister a callback.
-spec(del(hookpoint(), action() | {module(), atom()}) -> ok). -spec(del(hookpoint(), action() | {module(), atom()}) -> ok).
@ -205,7 +221,7 @@ execute({M, F, A}, Args) ->
erlang:apply(M, F, Args ++ A). erlang:apply(M, F, Args ++ A).
%% @doc Lookup callbacks. %% @doc Lookup callbacks.
-spec(lookup(hookpoint()) -> [#callback{}]). -spec(lookup(hookpoint()) -> [callback()]).
lookup(HookPoint) -> lookup(HookPoint) ->
case ets:lookup(?TAB, HookPoint) of case ets:lookup(?TAB, HookPoint) of
[#hook{callbacks = Callbacks}] -> [#hook{callbacks = Callbacks}] ->
@ -288,4 +304,3 @@ del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) ->
del_callback(Func, Callbacks, Acc); del_callback(Func, Callbacks, Acc);
del_callback(Action, [Callback | Callbacks], Acc) -> del_callback(Action, [Callback | Callbacks], Acc) ->
del_callback(Action, Callbacks, [Callback | Acc]). del_callback(Action, Callbacks, [Callback | Acc]).

View File

@ -159,7 +159,7 @@ update_overall_limiter(Zone, Capacity, Interval) ->
try try
esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval), esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval),
true true
catch _:_:_ -> catch _:_ ->
false false
end. end.
@ -167,6 +167,6 @@ delete_overall_limiter(Zone) ->
try try
esockd_limiter:delete({Zone, overall_messages_routing}), esockd_limiter:delete({Zone, overall_messages_routing}),
true true
catch _:_:_ -> catch _:_ ->
false false
end. end.

View File

@ -162,6 +162,14 @@ t_parse_malformed_utf8_string(_) ->
ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)).
t_parse_empty_topic_name(_) ->
Packet = <<48, 4, 0, 0, 0, 1>>,
NormalState = emqx_frame:initial_parse_state(#{strict_mode => false}),
?assertMatch({_, _}, emqx_frame:parse(Packet, NormalState)),
StrictState = emqx_frame:initial_parse_state(#{strict_mode => true}),
?catch_error(empty_topic_name, emqx_frame:parse(Packet, StrictState)).
t_parse_frame_proxy_protocol(_) -> t_parse_frame_proxy_protocol(_) ->
BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">>
, <<"\r\n\r\n\0\r\nQUIT\n">>], , <<"\r\n\r\n\0\r\nQUIT\n">>],