diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index b92d67e4f..a43f0519f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -5,18 +5,21 @@ [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, - {add_module,emqx_rule_date}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, + {add_module,emqx_rule_date}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, @@ -159,18 +162,21 @@ [{"4.3.10", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.9", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_maps,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, - {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_events,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_engine,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {delete_module,emqx_rule_date}]}, {"4.3.8", [{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.erl b/apps/emqx_rule_engine/src/emqx_rule_engine.erl index 923f78920..872e314cb 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.erl @@ -509,7 +509,7 @@ refresh_resource_status() -> fun(#resource{id = ResId, type = ResType}) -> case emqx_rule_registry:find_resource_type(ResType) of {ok, #resource_type{on_status = {Mod, OnStatus}}} -> - _ = fetch_resource_status(Mod, OnStatus, ResId); + fetch_resource_status(Mod, OnStatus, ResId); _ -> ok end end, emqx_rule_registry:get_resources()). diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 0bfc5242a..88b2c9143 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -51,13 +51,9 @@ apply_rules([], _Input) -> apply_rules([#rule{enabled = false}|More], Input) -> apply_rules(More, Input); apply_rules([Rule|More], Input) -> - apply_rule_discard_result(Rule, Input), + apply_rule(Rule, Input), apply_rules(More, Input). -apply_rule_discard_result(Rule, Input) -> - _ = apply_rule(Rule, Input), - ok. - apply_rule(Rule = #rule{id = RuleID}, Input) -> clear_rule_payload(), ok = emqx_rule_metrics:inc_rules_matched(RuleID), diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index 93db474d2..fd69f9601 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.11"}, % strict semver, bump manually! + {vsn, "4.3.12"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index 4e3e28a21..23136d849 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -12,10 +12,20 @@ {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, + {"4.3.9", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, + {"4.3.10", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, + {"4.3.11", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, - {"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{<<"4\\.3\\.[0-2]">>, [{apply,{application,stop,[emqx_web_hook]}}, @@ -28,8 +38,18 @@ {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {"4.3.8", - [{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, + {"4.3.9", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, + {"4.3.10", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, + {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, + {"4.3.11", + [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, - {"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.erl b/apps/emqx_web_hook/src/emqx_web_hook.erl index c83cc6101..d02e32bf7 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook.erl @@ -344,15 +344,13 @@ send_http_request(ClientID, Params) -> ok end. -parse_rule(Rules) -> - parse_rule(Rules, []). -parse_rule([], Acc) -> - lists:reverse(Acc); -parse_rule([{Rule, Conf} | Rules], Acc) -> +parse_rule([]) -> + []; +parse_rule([{Rule, Conf} | Rules]) -> Params = emqx_json:decode(iolist_to_binary(Conf)), Action = proplists:get_value(<<"action">>, Params), Filter = proplists:get_value(<<"topic">>, Params), - parse_rule(Rules, [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | Acc]). + [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | parse_rule(Rules)]. with_filter(Fun, _, undefined) -> Fun(), ok; @@ -389,4 +387,3 @@ stringfy(Term) -> maybe(undefined) -> null; maybe(Str) -> Str. - diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 58c8df0f6..aa0face38 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -258,7 +258,7 @@ on_action_data_to_webserver(Selected, _Envs = 'Pool' := Pool, 'RequestTimeout' := RequestTimeout}, clientid := ClientID}) -> - NBody = format_msg(BodyTokens, Selected), + NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), Req = create_req(Method, NPath, Headers, NBody), case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of @@ -280,7 +280,12 @@ on_action_data_to_webserver(Selected, _Envs = format_msg([], Data) -> emqx_json:encode(Data); format_msg(Tokens, Data) -> - emqx_rule_utils:proc_tmpl(Tokens, Data). + emqx_rule_utils:proc_tmpl(Tokens, Data). + +clear_user_property_header(#{headers := #{properties := #{'User-Property' := _} = P} = H} = S) -> + S#{headers => H#{properties => P#{'User-Property' => []}}}; +clear_user_property_header(S) -> + S. %%------------------------------------------------------------------------------ %% Internal functions @@ -365,9 +370,7 @@ get_ssl_opts(Opts, ResId) -> test_http_connect(Conf) -> Url = fun() -> maps:get(<<"url">>, Conf) end, - try - emqx_rule_utils:http_connectivity(Url()) - of + try emqx_rule_utils:http_connectivity(Url()) of ok -> true; {error, _Reason} -> ?LOG(error, "check http_connectivity failed: ~p", [Url()]), diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl index ff171fa9c..95b799568 100644 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl +++ b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl @@ -19,9 +19,8 @@ -compile(export_all). -compile(nowarn_export_all). --include_lib("emqx/include/emqx.hrl"). -include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_rule_engine/include/rule_engine.hrl"). -define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))). -define(ACTION(Name), #{<<"action">> := Name}). @@ -35,7 +34,7 @@ all() -> , {group, https} , {group, ipv6http} , {group, ipv6https} - , {group, all} + , test_rule_webhook ]. groups() -> @@ -44,12 +43,18 @@ groups() -> , {https, [sequence], Cases} , {ipv6http, [sequence], Cases} , {ipv6https, [sequence], Cases} - , {all, [sequence], emqx_ct:all(?MODULE)} ]. -start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F). +start_apps() -> + [application:load(App) || App <- apps()], + emqx_ct_helpers:start_apps(apps()). +start_apps(F) -> + [application:load(App) || App <- apps()], + emqx_ct_helpers:start_apps(apps(), F). +init_per_group(rules, Config) -> Config; init_per_group(Name, Config) -> + net_kernel:start(['test@127.0.0.1', longnames]), application:ensure_all_started(emqx_management), set_special_cfgs(), BasePort = @@ -74,6 +79,20 @@ init_per_group(Name, Config) -> end, [{base_port, BasePort}, {transport_opts, Opts} | Config]. +init_per_testcase(test_rule_webhook, Config) -> + net_kernel:start(['test@127.0.0.1', longnames]), + ok = ekka_mnesia:start(), + ok = emqx_rule_registry:mnesia(boot), + Handler = fun(_) -> + application:set_env(emqx_web_hook, rules, []), + application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999/"), + application:set_env(emqx_web_hook, ssl, false), + application:set_env(emqx_web_hook, ssloptions, []) + end, + ok = start_apps(Handler), + Config; +init_per_testcase(_, Config) -> Config. + end_per_group(_Name, Config) -> emqx_ct_helpers:stop_apps(apps()), Config. @@ -119,6 +138,51 @@ set_special_cfgs() -> %% Test cases %%-------------------------------------------------------------------- +test_rule_webhook(_) -> + {ok, ServerPid} = http_server:start_link(self(), 9999, []), + receive {ServerPid, ready} -> ok + after 1000 -> error(timeout) + end, + + ok = emqx_rule_engine:load_providers(), + {ok, #resource{id = ResId}} = emqx_rule_engine:create_resource( + #{type => web_hook, + config => #{<<"url">> => "http://127.0.0.1:9999/"}, + description => <<"For testing">>}), + {ok, #rule{id = Id}} = emqx_rule_engine:create_rule( + #{rawsql => "select * from \"t1\"", + actions => [#{name => 'data_to_webserver', + args => #{<<"$resource">> => ResId}}], + type => web_hook, + description => <<"For testing">> + }), + + Properties = #{'User-Property' => [{<<"user_property_key">>, <<"user_property_value">>}]}, + ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]), + + {ok, Client} = emqtt:start_link([ {clientid, ClientId} + , {proto_ver, v5} + , {keepalive, 60} + ]), + {ok, _} = emqtt:connect(Client), + {ok, _} = emqtt:publish(Client, <<"t1">>, Properties, <<"Payload...">>, [{qos, 2}]), + + Res = receive {http_server, {Any, _Bool}, _Header} -> {ok, Any} + after 100 -> error + end, + + ?assertMatch({ok, _}, Res), + {ok, Body} = Res, + ?assertNotEqual([], binary:matches(Body, <<"User-Property">>)), + ?assertNotEqual([], binary:matches(Body, <<"user_property_key">>)), + ?assertNotEqual([], binary:matches(Body, <<"user_property_value">>)), + + emqtt:stop(Client), + http_server:stop(ServerPid), + emqx_rule_registry:remove_rule(Id), + emqx_rule_registry:remove_resource(ResId), + ok. + test_full_flow(Config) -> [_|_] = Opts = proplists:get_value(transport_opts, Config), BasePort = proplists:get_value(base_port, Config), @@ -128,7 +192,7 @@ test_full_flow(Config) -> after 1000 -> error(timeout) end, application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]), - ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]), + ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]), {ok, C} = emqtt:start_link([ {clientid, ClientId} , {proto_ver, v5} , {keepalive, 60} @@ -174,10 +238,10 @@ validate_params_and_headers(ClientState, ClientId) -> end after 5000 -> - case ClientState =:= undefined of - true -> error("client_was_never_connected"); - false -> error("terminate_action_is_not_received_in_time") - end + case ClientState =:= undefined of + true -> error("client_was_never_connected"); + false -> error("terminate_action_is_not_received_in_time") + end end. t_check_hooked(_) -> diff --git a/apps/emqx_web_hook/test/http_server.erl b/apps/emqx_web_hook/test/http_server.erl index 791f725d1..28d99d722 100644 --- a/apps/emqx_web_hook/test/http_server.erl +++ b/apps/emqx_web_hook/test/http_server.erl @@ -81,7 +81,7 @@ stop_https() -> compile_router(Parent) -> {ok, _} = application:ensure_all_started(cowboy), cowboy_router:compile([ - {'_', [{"/", ?MODULE, #{parent => Parent}}]} + {'_', [{'_', ?MODULE, #{parent => Parent}}]} ]). init(Req, #{parent := Parent} = State) ->