diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index fe2988c77..bb33daaba 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -10,6 +10,22 @@ File format: - One list item per change topic Change log ends with a list of github PRs +## v4.3.14 + +### Enhancements + +* In order to fix the execution order of exhook, e.g. before/after other plugins/modules, + ExHook now supports user customizing emqx_hook execute priority. +* add api: PUT /rules/{id}/reset_metrics. + This api reset the metrics of the rule engine of a rule, and reset the metrics of the action related to this rule. [#7474] +* Enhanced rule engine error handling when json parsing error. + +### Bug fixes + +* Prohibit empty topics in strict mode +* Make sure ehttpc delete useless pool always succeed. +* Update mongodb driver to fix potential process leak. + ## v4.3.13 ### Important changes diff --git a/Makefile b/Makefile index e078b9c4e..5499d6dbe 100644 --- a/Makefile +++ b/Makefile @@ -99,6 +99,7 @@ $(PROFILES:%=clean-%): .PHONY: clean-all clean-all: @rm -rf _build + @rm rebar.lock .PHONY: deps-all deps-all: $(REBAR) $(PROFILES:%=deps-%) diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src index 8db4ffe84..7ec03e92d 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.app.src @@ -1,6 +1,6 @@ {application, emqx_auth_jwt, [{description, "EMQ X Authentication with JWT"}, - {vsn, "4.3.1"}, % strict semver, bump manually! + {vsn, "4.3.2"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_auth_jwt_sup]}, {applications, [kernel,stdlib,jose]}, diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src b/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src index b9831bb6f..684b4fa93 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt.appup.src @@ -1,13 +1,13 @@ %% -*-: erlang -*- {VSN, [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]">>, [ {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} ], [ - {"4.3.0", [ + {<<"4\\.3\\.[0-1]">>, [ {load_module, emqx_auth_jwt_svr, brutal_purge, soft_purge, []} ]}, {<<".*">>, []} diff --git a/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl b/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl index b9d19bf57..f34cde783 100644 --- a/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl +++ b/apps/emqx_auth_jwt/src/emqx_auth_jwt_svr.erl @@ -91,7 +91,7 @@ do_init_jwks(Options) -> [K, V, Reason]), undefined; J -> J - catch T:R:_ -> + catch T:R -> ?LOG(warning, "Build ~p JWK ~p failed: {~p, ~p}~n", [K, V, T, R]), undefined diff --git a/apps/emqx_auth_mongo/rebar.config b/apps/emqx_auth_mongo/rebar.config index c89c15d3c..8db0176b0 100644 --- a/apps/emqx_auth_mongo/rebar.config +++ b/apps/emqx_auth_mongo/rebar.config @@ -1,7 +1,6 @@ {deps, %% NOTE: mind poolboy version when updating mongodb-erlang version - [{mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.12"}}}, - %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git + [%% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% (which has overflow_ttl feature added). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). %% By accident, We have always been using the upstream fork due to @@ -29,4 +28,3 @@ {cover_enabled, true}. {cover_opts, [verbose]}. {cover_export_enabled, true}. - diff --git a/apps/emqx_exhook/etc/emqx_exhook.conf b/apps/emqx_exhook/etc/emqx_exhook.conf index 4d94def06..6a4725e02 100644 --- a/apps/emqx_exhook/etc/emqx_exhook.conf +++ b/apps/emqx_exhook/etc/emqx_exhook.conf @@ -24,6 +24,17 @@ ## Value: false | Duration #exhook.auto_reconnect = 60s +## The exhook execution priority on the Chain of the emqx hooks. +## +## Modify the field to fix the exhook execute order before/after other plugins/modules. +## By default, most hooks registered by plugins or modules have a priority of 0. +## +## With the same priority of 0, the execute order depends on hookpoints mount order. +## Scilicet is the loaded order of plugins/ modules. +## +## Default: 0 +## Value: Integer +#exhook.hook_priority = 0 ##-------------------------------------------------------------------- ## The Hook callback servers diff --git a/apps/emqx_exhook/include/emqx_exhook.hrl b/apps/emqx_exhook/include/emqx_exhook.hrl index 7301fdcbb..58deb707a 100644 --- a/apps/emqx_exhook/include/emqx_exhook.hrl +++ b/apps/emqx_exhook/include/emqx_exhook.hrl @@ -41,4 +41,6 @@ , {'message.dropped', {emqx_exhook_handler, on_message_dropped, []}} ]). +-define(DEFAULT_HOOK_PRIORITY, 0). + -endif. diff --git a/apps/emqx_exhook/priv/emqx_exhook.schema b/apps/emqx_exhook/priv/emqx_exhook.schema index d11001c0d..4f6419c6a 100644 --- a/apps/emqx_exhook/priv/emqx_exhook.schema +++ b/apps/emqx_exhook/priv/emqx_exhook.schema @@ -15,6 +15,11 @@ {datatype, string} ]}. +{mapping, "exhook.hook_priority", "emqx_exhook.hook_priority", [ + {default, 0}, + {datatype, integer} +]}. + {translation, "emqx_exhook.auto_reconnect", fun(Conf) -> case cuttlefish:conf_get("exhook.auto_reconnect", Conf) of "false" -> false; diff --git a/apps/emqx_exhook/src/emqx_exhook.app.src b/apps/emqx_exhook/src/emqx_exhook.app.src index 46223d212..b386bcaca 100644 --- a/apps/emqx_exhook/src/emqx_exhook.app.src +++ b/apps/emqx_exhook/src/emqx_exhook.app.src @@ -1,6 +1,6 @@ {application, emqx_exhook, [{description, "EMQ X Extension for Hook"}, - {vsn, "4.3.4"}, + {vsn, "4.3.5"}, {modules, []}, {registered, []}, {mod, {emqx_exhook_app, []}}, diff --git a/apps/emqx_exhook/src/emqx_exhook.appup.src b/apps/emqx_exhook/src/emqx_exhook.appup.src index d6a699c33..dee9aed5f 100644 --- a/apps/emqx_exhook/src/emqx_exhook.appup.src +++ b/apps/emqx_exhook/src/emqx_exhook.appup.src @@ -1,13 +1,23 @@ -%% -*-: erlang -*- +%% -*- mode: erlang -*- {VSN, [ - {<<"4.3.[0-3]">>, [ + {"4.3.4", [ + {load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]}, + {load_module, emqx_exhook_server, brutal_purge,soft_purge,[]}, + {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} + ]}, + {<<"4\\.3\\.[0-3]">>, [ {restart_application, emqx_exhook} ]}, {<<".*">>, []} ], [ - {<<"4.3.[0-3]">>, [ + {"4.3.4", [ + {load_module, emqx_exhook_sup, brutal_purge,soft_purge,[]}, + {load_module, emqx_exhook_server, brutal_purge,soft_purge,[]}, + {update, emqx_exhook_mngr, {advanced, ["4.3.4"]}} + ]}, + {<<"4\\.3\\.[0-3]">>, [ {restart_application, emqx_exhook} ]}, {<<".*">>, []} diff --git a/apps/emqx_exhook/src/emqx_exhook_mngr.erl b/apps/emqx_exhook/src/emqx_exhook_mngr.erl index cadd5eb37..c6d89fb12 100644 --- a/apps/emqx_exhook/src/emqx_exhook_mngr.erl +++ b/apps/emqx_exhook/src/emqx_exhook_mngr.erl @@ -23,7 +23,7 @@ -include_lib("emqx/include/logger.hrl"). %% APIs --export([start_link/3]). +-export([start_link/4]). %% Mgmt API -export([ enable/2 @@ -59,9 +59,14 @@ %% Request options request_options :: grpc_client:options(), %% Timer references - trefs :: map() + trefs :: map(), + %% Hooks execute options + hooks_options :: hooks_options() }). +-export_type([ server_options/0 + , hooks_options/0]). + -type servers() :: [{Name :: atom(), server_options()}]. -type server_options() :: [ {scheme, http | https} @@ -69,6 +74,10 @@ | {port, inet:port_number()} ]. +-type hooks_options() :: #{hook_priority => integer()}. + +-define(DEFAULT_HOOK_OPTS, #{hook_priority => ?DEFAULT_HOOK_PRIORITY}). + -define(DEFAULT_TIMEOUT, 60000). -define(CNTER, emqx_exhook_counter). @@ -77,12 +86,12 @@ %% APIs %%-------------------------------------------------------------------- --spec start_link(servers(), false | non_neg_integer(), grpc_client:options()) +-spec start_link(servers(), false | non_neg_integer(), grpc_client:options(), hooks_options()) ->ignore | {ok, pid()} | {error, any()}. -start_link(Servers, AutoReconnect, ReqOpts) -> - gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts], []). +start_link(Servers, AutoReconnect, ReqOpts, HooksOpts) -> + gen_server:start_link(?MODULE, [Servers, AutoReconnect, ReqOpts, HooksOpts], []). -spec enable(pid(), atom()|string()) -> ok | {error, term()}. enable(Pid, Name) -> @@ -102,7 +111,7 @@ call(Pid, Req) -> %% gen_server callbacks %%-------------------------------------------------------------------- -init([Servers, AutoReconnect, ReqOpts0]) -> +init([Servers, AutoReconnect, ReqOpts0, HooksOpts]) -> process_flag(trap_exit, true), %% XXX: Due to the ExHook Module in the enterprise, %% this process may start multiple times and they will share this table @@ -120,32 +129,33 @@ init([Servers, AutoReconnect, ReqOpts0]) -> %% Load the hook servers ReqOpts = maps:without([request_failed_action], ReqOpts0), - {Waiting, Running} = load_all_servers(Servers, ReqOpts), + {Waiting, Running} = load_all_servers(Servers, ReqOpts, HooksOpts), {ok, ensure_reload_timer( #state{waiting = Waiting, running = Running, stopped = #{}, request_options = ReqOpts, auto_reconnect = AutoReconnect, - trefs = #{} + trefs = #{}, + hooks_options = HooksOpts } )}. %% @private -load_all_servers(Servers, ReqOpts) -> - load_all_servers(Servers, ReqOpts, #{}, #{}). -load_all_servers([], _Request, Waiting, Running) -> +load_all_servers(Servers, ReqOpts, HooksOpts) -> + load_all_servers(Servers, ReqOpts, HooksOpts, #{}, #{}). +load_all_servers([], _Request, _HooksOpts, Waiting, Running) -> {Waiting, Running}; -load_all_servers([{Name, Options}|More], ReqOpts, Waiting, Running) -> +load_all_servers([{Name, Options} | More], ReqOpts, HooksOpts, Waiting, Running) -> {NWaiting, NRunning} = - case emqx_exhook_server:load(Name, Options, ReqOpts) of + case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of {ok, ServerState} -> save(Name, ServerState), {Waiting, Running#{Name => Options}}; {error, _} -> {Waiting#{Name => Options}, Running} end, - load_all_servers(More, ReqOpts, NWaiting, NRunning). + load_all_servers(More, ReqOpts, HooksOpts, NWaiting, NRunning). handle_call({load, Name}, _From, State) -> {Result, NState} = do_load_server(Name, State), @@ -199,8 +209,27 @@ terminate(_Reason, State = #state{running = Running}) -> _ = unload_exhooks(), ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. +%% in the emqx_exhook:v4.3.5, we have added one new field in the state last: +%% - hooks_options :: map() +code_change({down, _Vsn}, State, [ToVsn]) -> + case re:run(ToVsn, "4\\.3\\.[0-4]") of + {match, _} -> + NState = list_to_tuple( + lists:droplast( + tuple_to_list(State))), + {ok, NState}; + _ -> + {ok, State} + end; +code_change(_Vsn, State, [FromVsn]) -> + case re:run(FromVsn, "4\\.3\\.[0-4]") of + {match, _} -> + NState = list_to_tuple( + tuple_to_list(State) ++ [?DEFAULT_HOOK_OPTS]), + {ok, NState}; + _ -> + {ok, State} + end. %%-------------------------------------------------------------------- %% Internal funcs @@ -214,7 +243,8 @@ do_load_server(Name, State0 = #state{ waiting = Waiting, running = Running, stopped = Stopped, - request_options = ReqOpts}) -> + request_options = ReqOpts, + hooks_options = HooksOpts}) -> State = clean_reload_timer(Name, State0), case maps:get(Name, Running, undefined) of undefined -> @@ -223,7 +253,7 @@ do_load_server(Name, State0 = #state{ undefined -> {{error, not_found}, State}; Options -> - case emqx_exhook_server:load(Name, Options, ReqOpts) of + case emqx_exhook_server:load(Name, Options, ReqOpts, HooksOpts) of {ok, ServerState} -> save(Name, ServerState), ?LOG(info, "Load exhook callback server " diff --git a/apps/emqx_exhook/src/emqx_exhook_server.erl b/apps/emqx_exhook/src/emqx_exhook_server.erl index 7df5b643c..d3953ade7 100644 --- a/apps/emqx_exhook/src/emqx_exhook_server.erl +++ b/apps/emqx_exhook/src/emqx_exhook_server.erl @@ -25,7 +25,7 @@ -define(PB_CLIENT_MOD, emqx_exhook_v_1_hook_provider_client). %% Load/Unload --export([ load/3 +-export([ load/4 , unload/1 ]). @@ -81,8 +81,9 @@ %% Load/Unload APIs %%-------------------------------------------------------------------- --spec load(atom(), list(), map()) -> {ok, server()} | {error, term()} . -load(Name0, Opts0, ReqOpts) -> +-spec load(atom(), emqx_exhook_mngr:server_options(), grpc_client:options(), emqx_exhook_mngr:hooks_options()) + -> {ok, server()} | {error, term()} . +load(Name0, Opts0, ReqOpts, HooksOpts) -> Name = to_list(Name0), {SvrAddr, ClientOpts} = channel_opts(Opts0), case emqx_exhook_sup:start_grpc_client_channel( @@ -97,7 +98,7 @@ load(Name0, Opts0, ReqOpts) -> io_lib:format("exhook.~s.", [Name])), ensure_metrics(Prefix, HookSpecs), %% Ensure hooks - ensure_hooks(HookSpecs), + ensure_hooks(HookSpecs, maps:get(hook_priority, HooksOpts, ?DEFAULT_HOOK_PRIORITY)), {ok, #server{name = Name, options = ReqOpts, channel = _ChannPoolPid, @@ -174,7 +175,7 @@ resovle_hookspec(HookSpecs) when is_list(HookSpecs) -> case maps:get(name, HookSpec, undefined) of undefined -> Acc; Name0 -> - Name = try binary_to_existing_atom(Name0, utf8) catch T:R:_ -> {T,R} end, + Name = try binary_to_existing_atom(Name0, utf8) catch T:R -> {T,R} end, case lists:member(Name, AvailableHooks) of true -> case lists:member(Name, MessageHooks) of @@ -193,13 +194,13 @@ ensure_metrics(Prefix, HookSpecs) -> || Hookpoint <- maps:keys(HookSpecs)], lists:foreach(fun emqx_metrics:ensure/1, Keys). -ensure_hooks(HookSpecs) -> +ensure_hooks(HookSpecs, Priority) -> lists:foreach(fun(Hookpoint) -> case lists:keyfind(Hookpoint, 1, ?ENABLED_HOOKS) of false -> ?LOG(error, "Unknown name ~s to hook, skip it!", [Hookpoint]); {Hookpoint, {M, F, A}} -> - emqx_hooks:put(Hookpoint, {M, F, A}), + emqx_hooks:put(Hookpoint, {M, F, A}, Priority), ets:update_counter(?CNTER, Hookpoint, {2, 1}, {Hookpoint, 0}) end end, maps:keys(HookSpecs)). diff --git a/apps/emqx_exhook/src/emqx_exhook_sup.erl b/apps/emqx_exhook/src/emqx_exhook_sup.erl index e9c405de0..1b5b4f7af 100644 --- a/apps/emqx_exhook/src/emqx_exhook_sup.erl +++ b/apps/emqx_exhook/src/emqx_exhook_sup.erl @@ -18,6 +18,8 @@ -behaviour(supervisor). +-include("emqx_exhook.hrl"). + -export([ start_link/0 , init/1 ]). @@ -43,7 +45,7 @@ start_link() -> init([]) -> Mngr = ?CHILD(emqx_exhook_mngr, worker, - [servers(), auto_reconnect(), request_options()]), + [servers(), auto_reconnect(), request_options(), hooks_options()]), {ok, {{one_for_one, 10, 100}, [Mngr]}}. servers() -> @@ -57,6 +59,10 @@ request_options() -> request_failed_action => env(request_failed_action, deny) }. +hooks_options() -> + #{hook_priority => env(hook_priority, ?DEFAULT_HOOK_PRIORITY) + }. + env(Key, Def) -> application:get_env(emqx_exhook, Key, Def). diff --git a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl index b4c58fe62..5aed2f2b8 100644 --- a/apps/emqx_exhook/test/emqx_exhook_SUITE.erl +++ b/apps/emqx_exhook/test/emqx_exhook_SUITE.erl @@ -98,10 +98,31 @@ t_cli_stats(_) -> _ = emqx_exhook_cli:cli(x), unmeck_print(). +t_priority(_) -> + restart_exhook_with_envs([{emqx_exhook, hook_priority, 1}]), + + emqx_exhook:disable(default), + ok = emqx_exhook:enable(default), + [Callback | _] = emqx_hooks:lookup('client.connected'), + 1 = emqx_hooks:callback_priority(Callback). + %%-------------------------------------------------------------------- %% Utils %%-------------------------------------------------------------------- +%% TODO: make it more general and move to `emqx_ct_helpers` +restart_exhook_with_envs(Envs) -> + emqx_ct_helpers:stop_apps([emqx_exhook]), + SetPriorityFun + = fun(emqx) -> + set_special_cfgs(emqx); + (emqx_exhook) -> + lists:foreach(fun({App, Key, Val}) -> + application:set_env(App, Key, Val) + end, Envs) + end, + emqx_ct_helpers:start_apps([emqx_exhook], SetPriorityFun). + meck_print() -> meck:new(emqx_ctl, [passthrough, no_history, no_link]), meck:expect(emqx_ctl, print, fun(_) -> ok end), diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src index cd57630b4..f0d73e581 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.app.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.app.src @@ -1,6 +1,6 @@ {application, emqx_rule_engine, [{description, "EMQ X Rule Engine"}, - {vsn, "4.3.8"}, % strict semver, bump manually! + {vsn, "4.3.9"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_rule_engine_sup, emqx_rule_registry]}, {applications, [kernel,stdlib,rulesql,getopt]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 0ed0fee3a..e151086be 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,14 +1,28 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.7", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{"4.3.8", + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {"4.3.7", + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, @@ -17,7 +31,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -26,7 +41,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -35,7 +51,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -45,7 +62,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -56,7 +74,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -67,7 +86,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, @@ -78,14 +98,28 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.7", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{"4.3.8", + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}]}, + {"4.3.7", + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_metrics,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.6", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.6"]}}, @@ -94,7 +128,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.5", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.5"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -103,7 +138,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.4", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.4"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -112,7 +148,8 @@ {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.3", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.3"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -122,7 +159,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.2", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.2"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -133,7 +171,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.1", - [{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.1"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, @@ -144,7 +183,8 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.0", - [{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_sqltester,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {update,emqx_rule_metrics,{advanced,["4.3.0"]}}, {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index c8e69a17f..d984d18c1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -37,6 +37,7 @@ , test_resource/1 , start_resource/1 , get_resource_status/1 + , is_source_alive/1 , get_resource_params/1 , delete_resource/1 , update_resource/2 @@ -314,24 +315,37 @@ start_resource(ResId) -> end. -spec(test_resource(#{type := _, config := _, _ => _}) -> ok | {error, Reason :: term()}). -test_resource(#{type := Type, config := Config0}) -> +test_resource(#{type := Type} = Params) -> case emqx_rule_registry:find_resource_type(Type) of - {ok, #resource_type{on_create = {ModC, Create}, - on_destroy = {ModD, Destroy}, - params_spec = ParamSpec}} -> - Config = emqx_rule_validator:validate_params(Config0, ParamSpec), - ResId = resource_id(), + {ok, #resource_type{}} -> + ResId = maps:get(id, Params, resource_id()), try - _ = ?CLUSTER_CALL(init_resource, [ModC, Create, ResId, Config]), - _ = ?CLUSTER_CALL(clear_resource, [ModD, Destroy, ResId]), + _ = create_resource(maps:put(id, ResId, Params)), + true = is_source_alive(ResId), ok - catch - throw:Reason -> {error, Reason} + catch E:R:S -> + ?LOG(warning, "test resource failed, ~0p:~0p ~0p", [E, R, S]), + {error, R} + after + _ = ?CLUSTER_CALL(delete_resource, [ResId]) end; not_found -> {error, {resource_type_not_found, Type}} end. +is_source_alive(ResId) -> + case rpc:multicall(ekka_mnesia:running_nodes(), ?MODULE, get_resource_status, [ResId], 5000) of + {ResL, []} -> + is_source_alive_(ResL); + {_, _Errors} -> + false + end. + +is_source_alive_([]) -> true; +is_source_alive_([{ok, #{is_alive := true}} | ResL]) -> is_source_alive_(ResL); +is_source_alive_([{ok, #{is_alive := false}} | _ResL]) -> false; +is_source_alive_([_Error | _ResL]) -> false. + -spec(get_resource_status(resource_id()) -> {ok, resource_status()} | {error, Reason :: term()}). get_resource_status(ResId) -> case emqx_rule_registry:find_resource_params(ResId) of diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index 39ac1e9c2..b2766e0e0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -58,6 +58,13 @@ descr => "Delete a rule" }). +-rest_api(#{name => reset_metrics, + method => 'PUT', + path => "/rules/:bin:id/reset_metrics", + func => reset_metrics, + descr => "reset a rule metrics" + }). + -rest_api(#{name => list_actions, method => 'GET', path => "/actions/", @@ -154,6 +161,7 @@ , list_rules/2 , show_rule/2 , delete_rule/2 + , reset_metrics/2 ]). -export([ list_actions/2 @@ -252,6 +260,10 @@ delete_rule(#{id := Id}, _Params) -> ok = emqx_rule_engine:delete_rule(Id), return(ok). +reset_metrics(#{id := Id}, _Params) -> + ok = emqx_rule_metrics:reset_metrics(Id), + return(ok). + %%------------------------------------------------------------------------------ %% Actions API %%------------------------------------------------------------------------------ @@ -296,7 +308,7 @@ do_create_resource(Create, ParsedParams) -> list_resources(#{}, _Params) -> Data0 = lists:foldr(fun maybe_record_to_map/2, [], emqx_rule_registry:get_resources()), Data = lists:map(fun(Res = #{id := ResId}) -> - Status = get_aggregated_status(ResId), + Status = emqx_rule_engine:is_source_alive(ResId), maps:put(status, Status, Res) end, Data0), return({ok, Data}). @@ -304,14 +316,6 @@ list_resources(#{}, _Params) -> list_resources_by_type(#{type := Type}, _Params) -> return_all(emqx_rule_registry:get_resources_by_type(Type)). -get_aggregated_status(ResId) -> - lists:all(fun(Node) -> - case rpc:call(Node, emqx_rule_engine, get_resource_status, [ResId]) of - {ok, #{is_alive := true}} -> true; - _ -> false - end - end, ekka_mnesia:running_nodes()). - show_resource(#{id := Id}, _Params) -> case emqx_rule_registry:find_resource(Id) of {ok, R} -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_events.erl b/apps/emqx_rule_engine/src/emqx_rule_events.erl index fb7649e97..f3044ff74 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_events.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_events.erl @@ -172,18 +172,18 @@ eventmsg_connected(_ClientInfo = #{ is_bridge := IsBridge, mountpoint := Mountpoint }, - _ConnInfo = #{ + ConnInfo = #{ peername := PeerName, sockname := SockName, clean_start := CleanStart, proto_name := ProtoName, proto_ver := ProtoVer, - keepalive := Keepalive, connected_at := ConnectedAt, - conn_props := ConnProps, - receive_maximum := RcvMax, - expiry_interval := ExpiryInterval + receive_maximum := RcvMax }) -> + Keepalive = maps:get(keepalive, ConnInfo, 0), + ConnProps = maps:get(conn_props, ConnInfo, #{}), + ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0), with_basic_columns('client.connected', #{clientid => ClientId, username => Username, diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index a9897437e..ae5552159 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -197,6 +197,9 @@ , rfc3339_to_unix_ts/2 , now_timestamp/0 , now_timestamp/1 + , mongo_date/0 + , mongo_date/1 + , mongo_date/2 ]). %% Proc Dict Func @@ -900,6 +903,24 @@ time_unit(<<"millisecond">>) -> millisecond; time_unit(<<"microsecond">>) -> microsecond; time_unit(<<"nanosecond">>) -> nanosecond. +mongo_date() -> + erlang:timestamp(). + +mongo_date(MillisecondsTimestamp) -> + convert_timestamp(MillisecondsTimestamp). + +mongo_date(Timestamp, Unit) -> + InsertedTimeUnit = time_unit(Unit), + ScaledEpoch = erlang:convert_time_unit(Timestamp, InsertedTimeUnit, millisecond), + convert_timestamp(ScaledEpoch). + +convert_timestamp(MillisecondsTimestamp) -> + MicroTimestamp = MillisecondsTimestamp * 1000, + MegaSecs = MicroTimestamp div 1000_000_000_000, + Secs = MicroTimestamp div 1000_000 - MegaSecs*1000_000, + MicroSecs = MicroTimestamp rem 1000_000, + {MegaSecs, Secs, MicroSecs}. + %% @doc This is for sql funcs that should be handled in the specific modules. %% Here the emqx_rule_funcs module acts as a proxy, forwarding %% the function handling to the worker module. diff --git a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl index c7c38e145..9c6bfe905 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_metrics.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_metrics.erl @@ -63,6 +63,7 @@ , create_metrics/1 , clear_rule_metrics/1 , clear_metrics/1 + , reset_metrics/1 ]). -export([ get_rule_metrics/1 @@ -127,6 +128,45 @@ clear_rule_metrics(Id) -> clear_metrics(Id) -> gen_server:call(?MODULE, {delete_metrics, Id}). +-spec(reset_metrics(rule_id()) -> ok). +reset_metrics(Id) -> + reset_speeds(Id), + reset_metrics(Id, rule_metrics()), + case emqx_rule_registry:get_rule(Id) of + not_found -> ok; + {ok, #rule{actions = Actions}} -> + [ reset_metrics(ActionId, action_metrics()) + || #action_instance{ id = ActionId} <- Actions], + ok + end. + +reset_metrics(Id, Metrics) -> + case couters_ref(Id) of + not_found -> ok; + Ref -> [counters:put(Ref, metrics_idx(Idx), 0) + || Idx <- Metrics], + ok + end. + +reset_speeds(Id) -> + gen_server:call(?MODULE, {reset_speeds, Id}). + +rule_metrics() -> + [ 'rules.matched' + , 'rules.failed' + , 'rules.passed' + , 'rules.exception' + , 'rules.no_result' + ]. + +action_metrics() -> + [ 'actions.success' + , 'actions.error' + , 'actions.taken' + , 'actions.exception' + , 'actions.retry' + ]. + -spec(get(rule_id(), atom()) -> number()). get(Id, Metric) -> case couters_ref(Id) of @@ -288,6 +328,9 @@ handle_call({create_rule_metrics, Id}, _From, _ -> RuleSpeeds#{Id => #rule_speed{}} end}}; +handle_call({reset_speeds, Id}, _From, State = #state{rule_speeds = RuleSpeedMap}) -> + {reply, ok, State#state{rule_speeds = maps:put(Id, #rule_speed{}, RuleSpeedMap)}}; + handle_call({delete_metrics, Id}, _From, State = #state{metric_ids = MIDs, rule_speeds = undefined}) -> {reply, delete_counters(Id), State#state{metric_ids = sets:del_element(Id, MIDs)}}; @@ -325,7 +368,7 @@ handle_info(_Info, State) -> {noreply, State}. code_change({down, _Vsn}, State = #state{metric_ids = MIDs}, [Vsn]) -> - case string:tokens(Vsn, ".") of + case string:tokens(Vsn, ".") of ["4", "3", SVal] -> {Val, []} = string:to_integer(SVal), case Val =< 6 of diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 35121c046..2912bd7b3 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -50,31 +50,8 @@ apply_rules([], _Input) -> ok; apply_rules([#rule{enabled = false}|More], Input) -> apply_rules(More, Input); -apply_rules([Rule = #rule{id = RuleID}|More], Input) -> - try apply_rule_discard_result(Rule, Input) - catch - %% ignore the errors if select or match failed - _:{select_and_transform_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "SELECT clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{match_conditions_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "WHERE clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{select_and_collect_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", - [RuleID, Error]); - _:{match_incase_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(warning, "INCASE clause exception for ~s failed: ~p", - [RuleID, Error]); - _:Error:StkTrace -> - emqx_rule_metrics:inc_rules_exception(RuleID), - ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", - [RuleID, Error, StkTrace]) - end, +apply_rules([Rule|More], Input) -> + apply_rule_discard_result(Rule, Input), apply_rules(More, Input). apply_rule_discard_result(Rule, Input) -> @@ -84,7 +61,35 @@ apply_rule_discard_result(Rule, Input) -> apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), ok = emqx_rule_metrics:inc_rules_matched(RuleID), - do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})). + try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) + catch + %% ignore the errors if select or match failed + _:Reason = {select_and_transform_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "SELECT clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {match_conditions_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "WHERE clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {select_and_collect_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Reason = {match_incase_error, Error} -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(warning, "INCASE clause exception for ~s failed: ~p", + [RuleID, Error]), + {error, Reason}; + _:Error:StkTrace -> + emqx_rule_metrics:inc_rules_exception(RuleID), + ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", + [RuleID, Error, StkTrace]), + {error, {Error, StkTrace}} + end. do_apply_rule(#rule{id = RuleId, is_foreach = true, @@ -452,7 +457,8 @@ cache_payload(DecodedP) -> safe_decode_and_cache(MaybeJson) -> try cache_payload(emqx_json:decode(MaybeJson, [return_maps])) - catch _:_ -> #{} + catch + _:_:_-> error({decode_json_failed, MaybeJson}) end. ensure_list(List) when is_list(List) -> List; diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index 760205d62..26da4cead 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -77,7 +77,7 @@ test_rule(Sql, Select, Context, EventTopics) -> R of {ok, Data} -> {ok, flatten(Data)}; - {error, nomatch} -> {error, nomatch} + {error, Reason} -> {error, Reason} after ok = emqx_rule_registry:remove_action_instance_params(ActInstId) end. diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 02e0f607c..96480bfc5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -49,6 +49,7 @@ groups() -> [t_register_provider, t_unregister_provider, t_create_rule, + t_reset_metrics, t_create_resource ]}, {actions, [], @@ -124,7 +125,8 @@ groups() -> t_sqlparse_array_range_1, t_sqlparse_array_range_2, t_sqlparse_true_false, - t_sqlparse_new_map + t_sqlparse_new_map, + t_sqlparse_invalid_json ]}, {rule_metrics, [], [t_metrics, @@ -351,6 +353,39 @@ t_inspect_action(_Config) -> emqx_rule_registry:remove_resource(ResId), ok. +t_reset_metrics(_Config) -> + ok = emqx_rule_engine:load_providers(), + {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( + #{type => built_in, + config => #{}, + description => <<"debug resource">>}), + {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( + #{rawsql => "select clientid as c, username as u " + "from \"t1\" ", + actions => [#{name => 'inspect', + args => #{'$resource' => ResId, a=>1, b=>2}}], + type => built_in, + description => <<"Inspect rule">> + }), + {ok, Client} = emqtt:start_link([{username, <<"emqx">>}]), + {ok, _} = emqtt:connect(Client), + [ begin + emqtt:publish(Client, <<"t1">>, <<"{\"id\": 1, \"name\": \"ha\"}">>, 0), + timer:sleep(100) + end + || _ <- lists:seq(1,10)], + emqx_rule_metrics:reset_metrics(Id), + ?assertEqual(#{exception => 0,failed => 0, + matched => 0,no_result => 0,passed => 0, + speed => 0.0,speed_last5m => 0.0,speed_max => 0}, + emqx_rule_metrics:get_rule_metrics(Id)), + ?assertEqual(#{failed => 0,success => 0,taken => 0}, + emqx_rule_metrics:get_action_metrics(ResId)), + emqtt:stop(Client), + emqx_rule_registry:remove_rule(Id), + emqx_rule_registry:remove_resource(ResId), + ok. + t_republish_action(_Config) -> Qos0Received = emqx_metrics:val('messages.qos0.received'), Received = emqx_metrics:val('messages.received'), @@ -2274,12 +2309,13 @@ t_sqlparse_array_range_1(_Config) -> Sql02 = "select " " payload.a[1..4] as c " "from \"t/#\" ", - ?assertThrow({select_and_transform_error, {error,{range_get,non_list_data},_}}, + ?assertMatch({error, {select_and_transform_error, {error,{range_get,non_list_data},_}}}, emqx_rule_sqltester:test( #{<<"rawsql">> => Sql02, <<"ctx">> => #{<<"payload">> => <<"{\"x\":[0,1,2,3,4,5]}">>, <<"topic">> => <<"t/a">>}})), + %% construct a range: Sql1 = "select " " [1..4] as c, " @@ -2406,6 +2442,29 @@ t_sqlparse_nested_get(_Config) -> <<"payload">> => <<"{\"a\": {\"b\": 0}}">> }})). +t_sqlparse_invalid_json(_Config) -> + Sql02 = "select " + " payload.a[1..4] as c " + "from \"t/#\" ", + ?assertMatch({error, {select_and_transform_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql02, + <<"ctx">> => + #{<<"payload">> => <<"{\"x\":[0,1,2,3,}">>, + <<"topic">> => <<"t/a">>}})), + + + Sql2 = "foreach payload.sensors " + "do item.cmd as msg_type " + "from \"t/#\" ", + ?assertMatch({error, {select_and_collect_error, {error,{decode_json_failed,_},_}}}, + emqx_rule_sqltester:test( + #{<<"rawsql">> => Sql2, + <<"ctx">> => + #{<<"payload">> => + <<"{\"sensors\": [{\"cmd\":\"1\"} {\"cmd\":}]}">>, + <<"topic">> => <<"t/a">>}})). + %%------------------------------------------------------------------------------ %% Internal helpers %%------------------------------------------------------------------------------ diff --git a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src index efd362156..f8bd08677 100644 --- a/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src +++ b/lib-ce/emqx_dashboard/src/emqx_dashboard.app.src @@ -1,6 +1,6 @@ {application, emqx_dashboard, [{description, "EMQ X Web Dashboard"}, - {vsn, "4.3.9"}, % strict semver, bump manually! + {vsn, "4.3.10"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_dashboard_sup]}, {applications, [kernel,stdlib,mnesia,minirest]}, diff --git a/rebar.config b/rebar.config index 2003321bb..484969060 100644 --- a/rebar.config +++ b/rebar.config @@ -37,7 +37,7 @@ {deps, [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps - , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.14"}}} + , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.15"}}} , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} @@ -57,6 +57,7 @@ , {getopt, "1.0.1"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.15.0"}}} , {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.2.0"}}} + , {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}} ]}. {xref_ignores, diff --git a/scripts/update-appup.sh b/scripts/update-appup.sh index d62faee74..55bcc0122 100755 --- a/scripts/update-appup.sh +++ b/scripts/update-appup.sh @@ -55,6 +55,10 @@ while [ "$#" -gt 0 ]; do SKIP_BUILD='yes' shift ;; + --skip-build-base) + SKIP_BUILD_BASE='yes' + shift + ;; --check) # hijack the --check option IS_CHECK='yes' @@ -88,16 +92,20 @@ else NEW_COPY='no' fi -pushd "${PREV_DIR_BASE}/${PREV_TAG}" -if [ "$NEW_COPY" = 'no' ]; then - REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')" - git fetch "$REMOTE" +if [ "${SKIP_BUILD_BASE:-no}" = 'yes' ]; then + echo "not building relup base ${PREV_DIR_BASE}/${PREV_TAG}" +else + pushd "${PREV_DIR_BASE}/${PREV_TAG}" + if [ "$NEW_COPY" = 'no' ]; then + REMOTE="$(git remote -v | grep "${GIT_REPO}" | head -1 | awk '{print $1}')" + git fetch "$REMOTE" + fi + git reset --hard + git clean -fdx + git checkout "${PREV_TAG}" + make "$PROFILE" + popd fi -git reset --hard -git clean -fdx -git checkout "${PREV_TAG}" -make "$PROFILE" -popd PREV_REL_DIR="${PREV_DIR_BASE}/${PREV_TAG}/_build/${PROFILE}/lib" diff --git a/src/emqx.app.src b/src/emqx.app.src index 2f58c52a8..cfa03c2d4 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -6,7 +6,7 @@ %% the emqx `release' version, which in turn is comprised of several %% apps, one of which is this. See `emqx_release.hrl' for more %% info. - {vsn, "4.3.14"}, % strict semver, bump manually! + {vsn, "4.3.15"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [ kernel diff --git a/src/emqx.appup.src b/src/emqx.appup.src index 29c060b78..8825da442 100644 --- a/src/emqx.appup.src +++ b/src/emqx.appup.src @@ -1,7 +1,10 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.13", + [{"4.3.14", + [{load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, + {"4.3.13", [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, {load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, @@ -18,7 +21,8 @@ {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, {load_module,emqx_misc,brutal_purge,soft_purge,[]}, - {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -44,7 +48,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -72,7 +77,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -100,7 +106,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -132,7 +139,8 @@ {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -164,7 +172,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -197,7 +206,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -230,7 +240,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -264,7 +275,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -298,7 +310,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -333,7 +346,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -368,7 +382,8 @@ {load_module,emqx_os_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -406,7 +421,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -447,14 +463,18 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, - {<<".*">>,[]}], - [{"4.3.13", - [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, + {<<".*">>,[]}], + [{"4.3.14", + [{load_module,emqx_frame,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, + {"4.3.13", + [{load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, {load_module,emqx_shared_sub,brutal_purge,soft_purge,[]}, - {load_module,emqx_plugins,brutal_purge,soft_purge,[]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, {load_module,emqx_frame,brutal_purge,soft_purge,[]}, {load_module,emqx_sys_mon,brutal_purge,soft_purge,[]}, {load_module,emqx_pmon,brutal_purge,soft_purge,[]}, @@ -466,7 +486,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_ctl,brutal_purge,soft_purge,[]}, {load_module,emqx_cm,brutal_purge,soft_purge,[]}, - {load_module,emqx_connection,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_connection,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.12", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -491,7 +512,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.11", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -518,7 +540,8 @@ {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_http_lib,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.10", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -545,7 +568,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_connection,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -576,7 +600,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -607,7 +632,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.7", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -639,7 +665,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.6", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -671,7 +698,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.5", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -704,7 +732,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.4", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -737,7 +766,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -771,7 +801,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.2", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -805,7 +836,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.1", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -842,7 +874,8 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {"4.3.0", [{load_module,emqx_flapping,brutal_purge,soft_purge,[]}, {load_module,emqx_listeners,brutal_purge,soft_purge,[]}, @@ -881,5 +914,6 @@ {load_module,emqx,brutal_purge,soft_purge,[]}, {load_module,emqx_app,brutal_purge,soft_purge,[]}, {load_module,emqx_message,brutal_purge,soft_purge,[]}, - {load_module,emqx_limiter,brutal_purge,soft_purge,[]}]}, + {load_module,emqx_limiter,brutal_purge,soft_purge,[]}, + {load_module,emqx_hooks,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 48e8b71fb..aa71b16ae 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -265,7 +265,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNACK}, < - {TopicName, Rest} = parse_utf8_string(Bin, StrictMode), + {TopicName, Rest} = parse_topic_name(Bin, StrictMode), {PacketId, Rest1} = case QoS of ?QOS_0 -> {undefined, Rest}; _ -> parse_packet_id(Rest) @@ -357,7 +357,7 @@ parse_will_message(Packet = #mqtt_packet_connect{will_flag = true, proto_ver = Ver}, Bin, StrictMode) -> {Props, Rest} = parse_properties(Bin, Ver, StrictMode), - {Topic, Rest1} = parse_utf8_string(Rest, StrictMode), + {Topic, Rest1} = parse_topic_name(Rest, StrictMode), {Payload, Rest2} = parse_binary_data(Rest1), {Packet#mqtt_packet_connect{will_props = Props, will_topic = Topic, @@ -524,6 +524,14 @@ parse_binary_data(Bin) when 2 > byte_size(Bin) -> error(malformed_binary_data_length). +parse_topic_name(Bin, false) -> + parse_utf8_string(Bin, false); +parse_topic_name(Bin, true) -> + case parse_utf8_string(Bin, true) of + {<<>>, _Rest} -> error(empty_topic_name); + Result -> Result + end. + %%-------------------------------------------------------------------- %% Serialize MQTT Packet %%-------------------------------------------------------------------- diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 95ef33fdd..bd795ee0a 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -32,6 +32,8 @@ , add/3 , add/4 , put/2 + , put/3 + , put/4 , del/2 , run/2 , run_fold/3 @@ -75,6 +77,8 @@ priority :: integer() }). +-type callback() :: #callback{}. + -record(hook, { name :: hookpoint(), callbacks :: list(#callback{}) @@ -110,7 +114,7 @@ callback_priority(#callback{priority= P}) -> P. %%-------------------------------------------------------------------- %% @doc Register a callback --spec(add(hookpoint(), action() | #callback{}) -> ok_or_error(already_exists)). +-spec(add(hookpoint(), action() | callback()) -> ok_or_error(already_exists)). add(HookPoint, Callback) when is_record(Callback, callback) -> gen_server:call(?SERVER, {add, HookPoint, Callback}, infinity); add(HookPoint, Action) when is_function(Action); is_tuple(Action) -> @@ -131,12 +135,24 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Like add/2, it register a callback, discard 'already_exists' error. --spec(put(hookpoint(), action() | #callback{}) -> ok). -put(HookPoint, Callback) -> +-spec put(hookpoint(), action() | callback()) -> ok. +put(HookPoint, Callback) when is_record(Callback, callback) -> case add(HookPoint, Callback) of ok -> ok; {error, already_exists} -> ok - end. + end; +put(HookPoint, Action) when is_function(Action); is_tuple(Action) -> + ?MODULE:put(HookPoint, #callback{action = Action, priority = 0}). + +-spec put(hookpoint(), action(), filter() | integer() | list()) -> ok. +put(HookPoint, Action, {_M, _F, _A} = Filter) -> + ?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = 0}); +put(HookPoint, Action, Priority) when is_integer(Priority) -> + ?MODULE:put(HookPoint, #callback{action = Action, priority = Priority}). + +-spec put(hookpoint(), action(), filter(), integer()) -> ok. +put(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> + ?MODULE:put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Unregister a callback. -spec(del(hookpoint(), action() | {module(), atom()}) -> ok). @@ -205,7 +221,7 @@ execute({M, F, A}, Args) -> erlang:apply(M, F, Args ++ A). %% @doc Lookup callbacks. --spec(lookup(hookpoint()) -> [#callback{}]). +-spec(lookup(hookpoint()) -> [callback()]). lookup(HookPoint) -> case ets:lookup(?TAB, HookPoint) of [#hook{callbacks = Callbacks}] -> @@ -288,4 +304,3 @@ del_callback(Func, [#callback{action = {Func, _A}} | Callbacks], Acc) -> del_callback(Func, Callbacks, Acc); del_callback(Action, [Callback | Callbacks], Acc) -> del_callback(Action, Callbacks, [Callback | Acc]). - diff --git a/src/emqx_limiter.erl b/src/emqx_limiter.erl index 1cca7140b..9a13e78f4 100644 --- a/src/emqx_limiter.erl +++ b/src/emqx_limiter.erl @@ -159,7 +159,7 @@ update_overall_limiter(Zone, Capacity, Interval) -> try esockd_limiter:update({Zone, overall_messages_routing}, Capacity, Interval), true - catch _:_:_ -> + catch _:_ -> false end. @@ -167,6 +167,6 @@ delete_overall_limiter(Zone) -> try esockd_limiter:delete({Zone, overall_messages_routing}), true - catch _:_:_ -> + catch _:_ -> false end. diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index d98786e99..81c861bdb 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -162,6 +162,14 @@ t_parse_malformed_utf8_string(_) -> ParseState = emqx_frame:initial_parse_state(#{strict_mode => true}), ?catch_error(utf8_string_invalid, emqx_frame:parse(MalformedPacket, ParseState)). +t_parse_empty_topic_name(_) -> + Packet = <<48, 4, 0, 0, 0, 1>>, + NormalState = emqx_frame:initial_parse_state(#{strict_mode => false}), + ?assertMatch({_, _}, emqx_frame:parse(Packet, NormalState)), + + StrictState = emqx_frame:initial_parse_state(#{strict_mode => true}), + ?catch_error(empty_topic_name, emqx_frame:parse(Packet, StrictState)). + t_parse_frame_proxy_protocol(_) -> BinList = [ <<"PROXY TCP4 ">>, <<"PROXY TCP6 ">>, <<"PROXY UNKNOWN">> , <<"\r\n\r\n\0\r\nQUIT\n">>],