From 982fc9b4fe0d580bb9dbc383c0fd1a0101cdd69b Mon Sep 17 00:00:00 2001 From: Shawn <506895667@qq.com> Date: Wed, 26 Oct 2022 20:34:59 +0800 Subject: [PATCH] feat: support to use placeholders as http headers in webhook actions --- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 9 +++ .../test/emqx_rule_utils_SUITE.erl | 15 ++++ apps/emqx_web_hook/src/emqx_web_hook.app.src | 2 +- .../emqx_web_hook/src/emqx_web_hook.appup.src | 4 +- .../src/emqx_web_hook_actions.erl | 69 +++++++++++++++++-- .../test/emqx_web_hook_SUITE.erl | 53 ++++++++++++++ changes/v4.3.22-en.md | 2 + changes/v4.3.22-zh.md | 2 + 8 files changed, 146 insertions(+), 10 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 5c9472367..cf945d5f5 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -34,6 +34,7 @@ , proc_sql/2 , proc_sql_param_str/2 , proc_cql_param_str/2 + , if_contains_placeholder/1 ]). %% type converting @@ -175,6 +176,14 @@ proc_param_str(Tokens, Data, Quote) -> iolist_to_binary( proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). +%% return true if the Str contains any placeholder in "${var}" format. +-spec(if_contains_placeholder(string() | binary()) -> boolean()). +if_contains_placeholder(Str) -> + case re:split(Str, ?EX_PLACE_HOLDER, [{return, list}, group, trim]) of + [[_]] -> false; + _ -> true + end. + %% backward compatibility for hot upgrading from =< e4.2.1 get_phld_var(Fun, Data) when is_function(Fun) -> Fun(Data); diff --git a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl index 73c6f821d..95445bf84 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_utils_SUITE.erl @@ -134,3 +134,18 @@ t_preproc_sql5(_) -> ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)). + +t_if_contains_placeholder(_) -> + ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)), + ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)), + ?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)), + ?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)), + ?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)), + ?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)), + ?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)), + ok. diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index efe41b3bb..5726f5d89 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.14"}, % strict semver, bump manually! + {vsn, "4.3.15"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index a2f72f0e2..27a157bc4 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -21,7 +21,7 @@ {"4.3.11", [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.1[2-3]">>, + {<<"4\\.3\\.1[2-4]">>, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], [{<<"4\\.3\\.[0-7]">>, @@ -44,6 +44,6 @@ {"4.3.11", [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.1[2-3]">>, + {<<"4\\.3\\.1[2-4]">>, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 29550e1e9..7b605cbfc 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -26,6 +26,13 @@ , on_action_data_to_webserver/2 ]). +-ifdef(TEST). +-export([ preproc_and_normalise_headers/1 + , maybe_proc_headers/3 + , maybe_remove_content_type_header/2 + ]). +-endif. + -export_type([action_fun/0]). -include_lib("emqx/include/emqx.hrl"). @@ -263,7 +270,8 @@ on_action_data_to_webserver(Selected, _Envs = metadata := Metadata}) -> NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), - Req = create_req(Method, NPath, Headers, NBody), + Headers1 = maybe_proc_headers(Headers, Method, Selected), + Req = create_req(Method, NPath, Headers1, NBody), case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> ?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]), @@ -307,8 +315,9 @@ create_req(_, Path, Headers, Body) -> parse_action_params(Params = #{<<"url">> := URL}) -> {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), Method = method(maps:get(<<"method">>, Params, <<"POST">>)), - Headers = headers(maps:get(<<"headers">>, Params, #{})), - NHeaders = ensure_content_type_header(Headers, Method), + Headers0 = maps:get(<<"headers">>, Params, #{}), + Headers1 = preproc_and_normalise_headers(Headers0), + NHeaders = maybe_remove_content_type_header(Headers1, Method), #{method => Method, path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)), headers => NHeaders, @@ -316,9 +325,17 @@ parse_action_params(Params = #{<<"url">> := URL}) -> request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), pool => maps:get(<<"pool">>, Params)}. -ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put -> +%% According to https://www.rfc-editor.org/rfc/rfc7231#section-3.1.1.5, the +%% Content-Type HTTP header should be set only for PUT and POST requests. +maybe_remove_content_type_header({has_tmpl_token, Headers}, Method) -> + {has_tmpl_token, maybe_remove_content_type_header(Headers, Method)}; +maybe_remove_content_type_header(Headers, Method) when is_map(Headers), (Method =:= post orelse Method =:= put) -> + maps:to_list(Headers); +maybe_remove_content_type_header(Headers, Method) when is_list(Headers), (Method =:= post orelse Method =:= put) -> Headers; -ensure_content_type_header(Headers, _Method) -> +maybe_remove_content_type_header(Headers, _Method) when is_map(Headers) -> + maps:to_list(maps:remove(<<"content-type">>, Headers)); +maybe_remove_content_type_header(Headers, _Method) when is_list(Headers) -> lists:keydelete(<<"content-type">>, 1, Headers). merge_path(CommonPath, <<>>) -> @@ -335,8 +352,46 @@ method(POST) when POST == <<"POST">>; POST == <<"post">> -> post; method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete. -headers(Headers) -> - emqx_http_lib:normalise_headers(maps:to_list(Headers)). +normalize_key(K) -> + %% see emqx_http_lib:normalise_headers/1 for more info + K1 = re:replace(K, "_", "-", [{return, binary}]), + string:lowercase(K1). + +preproc_and_normalise_headers(Headers) -> + Preproc = fun(Str) -> {tmpl_token, emqx_rule_utils:preproc_tmpl(Str)} end, + Res = maps:fold(fun(K, V, {Flag, Acc}) -> + case {emqx_rule_utils:if_contains_placeholder(K), + emqx_rule_utils:if_contains_placeholder(V)} of + {false, false} -> + {Flag, Acc#{normalize_key(K) => V}}; + {false, true} -> + {has_tmpl_token, Acc#{normalize_key(K) => Preproc(V)}}; + {true, false} -> + {has_tmpl_token, Acc#{Preproc(K) => V}}; + {true, true} -> + {has_tmpl_token, Acc#{Preproc(K) => Preproc(V)}} + end + end, {no_token, #{}}, Headers), + case Res of + {no_token, RHeaders} -> RHeaders; + {has_tmpl_token, _} -> Res + end. + +maybe_proc_headers({has_tmpl_token, HeadersTks}, Method, Data) -> + MaybeProc = fun + (key, {tmpl_token, Tokens}) -> + normalize_key(emqx_rule_utils:proc_tmpl(Tokens, Data)); + (val, {tmpl_token, Tokens}) -> + emqx_rule_utils:proc_tmpl(Tokens, Data); + (_, Str) -> + Str + end, + Headers = [{MaybeProc(key, K), MaybeProc(val, V)} || {K, V} <- HeadersTks], + maybe_remove_content_type_header(Headers, Method); +maybe_proc_headers(Headers, _, _) -> + %% For headers of old emqx versions, and normal header without placeholders, + %% the Headers are not pre-processed + Headers. str(Str) when is_list(Str) -> Str; str(Atom) when is_atom(Atom) -> atom_to_list(Atom); diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl index 95b799568..3dc4073f7 100644 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl +++ b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl @@ -35,6 +35,7 @@ all() -> , {group, ipv6http} , {group, ipv6https} , test_rule_webhook + , test_preproc_headers ]. groups() -> @@ -138,6 +139,58 @@ set_special_cfgs() -> %% Test cases %%-------------------------------------------------------------------- +test_preproc_headers(_) -> + TestTable = [ + {#{<<"Content_TYPE">> => <<"application/JSON">>, <<"Key">> => <<"Val">>}, + #{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>} + }, + {#{<<"${ContentTypeKey}">> => <<"application/JSON">>}, + #{<<"content-type">> => <<"application/JSON">>} + }, + {#{<<"content-type">> => <<"${ContentTypeVal}">>}, + #{<<"content-type">> => <<"application/JSON">>} + }, + {#{<<"Content_type">> => <<"${ContentTypeVal}">>}, + #{<<"content-type">> => <<"application/JSON">>} + }, + {#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>}, + #{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>} + }, + {#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>}, + #{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>} + }, + {#{<<"Content_${TypeKey}">> => <<"application/${TypeVal}">>, <<"Key">> => <<"Val">>}, + #{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>} + } + ], + SelectedData1 = #{ + <<"ContentTypeKey">> => <<"content-type">>, + <<"ContentTypeVal">> => <<"application/JSON">>, + <<"TypeKey">> => <<"type">>, + <<"TypeVal">> => <<"JSON">> + }, + SelectedData2 = #{ + <<"ContentTypeKey">> => <<"ConTent_Type">>, + <<"ContentTypeVal">> => <<"application/JSON">>, + <<"TypeKey">> => <<"TYPe">>, + <<"TypeVal">> => <<"JSON">> + }, + [begin + ct:pal("test_preproc_headers, input: ~p, method: ~p, selected: ~p", [Input, Method, Selected]), + Headers0 = emqx_web_hook_actions:preproc_and_normalise_headers(Input), + Headers1 = emqx_web_hook_actions:maybe_remove_content_type_header(Headers0, Method), + Result0 = emqx_web_hook_actions:maybe_proc_headers(Headers1, Method, Selected), + Expected1 = case Method =/= post andalso Method =/= put of + true -> maps:remove(<<"content-type">>, Expected); + false -> Expected + end, + ?assertEqual(Expected1, maps:from_list(Result0)) + end || + {Input, Expected} <- TestTable, + Selected <- [SelectedData1, SelectedData2], + Method <- [post, put, get, delete] + ]. + test_rule_webhook(_) -> {ok, ServerPid} = http_server:start_link(self(), 9999, []), receive {ServerPid, ready} -> ok diff --git a/changes/v4.3.22-en.md b/changes/v4.3.22-en.md index c859b7e5a..4bbf04b8a 100644 --- a/changes/v4.3.22-en.md +++ b/changes/v4.3.22-en.md @@ -2,6 +2,8 @@ ## Enhancements +- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239). + - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). This is to avoid slowing down the boot if some resources spend long time establishing the connection. diff --git a/changes/v4.3.22-zh.md b/changes/v4.3.22-zh.md index afe709ffe..53b244163 100644 --- a/changes/v4.3.22-zh.md +++ b/changes/v4.3.22-zh.md @@ -2,6 +2,8 @@ ## 增强 +- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。 + - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。