feat: support to use placeholders as http headers in webhook actions

This commit is contained in:
Shawn 2022-10-26 20:34:59 +08:00
parent f5c0ef3e56
commit 982fc9b4fe
8 changed files with 146 additions and 10 deletions

View File

@ -34,6 +34,7 @@
, proc_sql/2 , proc_sql/2
, proc_sql_param_str/2 , proc_sql_param_str/2
, proc_cql_param_str/2 , proc_cql_param_str/2
, if_contains_placeholder/1
]). ]).
%% type converting %% type converting
@ -175,6 +176,14 @@ proc_param_str(Tokens, Data, Quote) ->
iolist_to_binary( iolist_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
%% return true if the Str contains any placeholder in "${var}" format.
-spec(if_contains_placeholder(string() | binary()) -> boolean()).
if_contains_placeholder(Str) ->
case re:split(Str, ?EX_PLACE_HOLDER, [{return, list}, group, trim]) of
[[_]] -> false;
_ -> true
end.
%% backward compatibility for hot upgrading from =< e4.2.1 %% backward compatibility for hot upgrading from =< e4.2.1
get_phld_var(Fun, Data) when is_function(Fun) -> get_phld_var(Fun, Data) when is_function(Fun) ->
Fun(Data); Fun(Data);

View File

@ -134,3 +134,18 @@ t_preproc_sql5(_) ->
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)). emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
t_if_contains_placeholder(_) ->
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
ok.

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook, {application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"}, [{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.14"}, % strict semver, bump manually! {vsn, "4.3.15"}, % strict semver, bump manually!
{modules, []}, {modules, []},
{registered, [emqx_web_hook_sup]}, {registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]}, {applications, [kernel,stdlib,ehttpc]},

View File

@ -21,7 +21,7 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{<<"4\\.3\\.[0-7]">>, [{<<"4\\.3\\.[0-7]">>,
@ -44,6 +44,6 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -26,6 +26,13 @@
, on_action_data_to_webserver/2 , on_action_data_to_webserver/2
]). ]).
-ifdef(TEST).
-export([ preproc_and_normalise_headers/1
, maybe_proc_headers/3
, maybe_remove_content_type_header/2
]).
-endif.
-export_type([action_fun/0]). -export_type([action_fun/0]).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -263,7 +270,8 @@ on_action_data_to_webserver(Selected, _Envs =
metadata := Metadata}) -> metadata := Metadata}) ->
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody), Headers1 = maybe_proc_headers(Headers, Method, Selected),
Req = create_req(Method, NPath, Headers1, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]), ?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
@ -307,8 +315,9 @@ create_req(_, Path, Headers, Body) ->
parse_action_params(Params = #{<<"url">> := URL}) -> parse_action_params(Params = #{<<"url">> := URL}) ->
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
Method = method(maps:get(<<"method">>, Params, <<"POST">>)), Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
Headers = headers(maps:get(<<"headers">>, Params, #{})), Headers0 = maps:get(<<"headers">>, Params, #{}),
NHeaders = ensure_content_type_header(Headers, Method), Headers1 = preproc_and_normalise_headers(Headers0),
NHeaders = maybe_remove_content_type_header(Headers1, Method),
#{method => Method, #{method => Method,
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)), path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
headers => NHeaders, headers => NHeaders,
@ -316,9 +325,17 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
pool => maps:get(<<"pool">>, Params)}. pool => maps:get(<<"pool">>, Params)}.
ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put -> %% According to https://www.rfc-editor.org/rfc/rfc7231#section-3.1.1.5, the
%% Content-Type HTTP header should be set only for PUT and POST requests.
maybe_remove_content_type_header({has_tmpl_token, Headers}, Method) ->
{has_tmpl_token, maybe_remove_content_type_header(Headers, Method)};
maybe_remove_content_type_header(Headers, Method) when is_map(Headers), (Method =:= post orelse Method =:= put) ->
maps:to_list(Headers);
maybe_remove_content_type_header(Headers, Method) when is_list(Headers), (Method =:= post orelse Method =:= put) ->
Headers; Headers;
ensure_content_type_header(Headers, _Method) -> maybe_remove_content_type_header(Headers, _Method) when is_map(Headers) ->
maps:to_list(maps:remove(<<"content-type">>, Headers));
maybe_remove_content_type_header(Headers, _Method) when is_list(Headers) ->
lists:keydelete(<<"content-type">>, 1, Headers). lists:keydelete(<<"content-type">>, 1, Headers).
merge_path(CommonPath, <<>>) -> merge_path(CommonPath, <<>>) ->
@ -335,8 +352,46 @@ method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete. method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
headers(Headers) -> normalize_key(K) ->
emqx_http_lib:normalise_headers(maps:to_list(Headers)). %% see emqx_http_lib:normalise_headers/1 for more info
K1 = re:replace(K, "_", "-", [{return, binary}]),
string:lowercase(K1).
preproc_and_normalise_headers(Headers) ->
Preproc = fun(Str) -> {tmpl_token, emqx_rule_utils:preproc_tmpl(Str)} end,
Res = maps:fold(fun(K, V, {Flag, Acc}) ->
case {emqx_rule_utils:if_contains_placeholder(K),
emqx_rule_utils:if_contains_placeholder(V)} of
{false, false} ->
{Flag, Acc#{normalize_key(K) => V}};
{false, true} ->
{has_tmpl_token, Acc#{normalize_key(K) => Preproc(V)}};
{true, false} ->
{has_tmpl_token, Acc#{Preproc(K) => V}};
{true, true} ->
{has_tmpl_token, Acc#{Preproc(K) => Preproc(V)}}
end
end, {no_token, #{}}, Headers),
case Res of
{no_token, RHeaders} -> RHeaders;
{has_tmpl_token, _} -> Res
end.
maybe_proc_headers({has_tmpl_token, HeadersTks}, Method, Data) ->
MaybeProc = fun
(key, {tmpl_token, Tokens}) ->
normalize_key(emqx_rule_utils:proc_tmpl(Tokens, Data));
(val, {tmpl_token, Tokens}) ->
emqx_rule_utils:proc_tmpl(Tokens, Data);
(_, Str) ->
Str
end,
Headers = [{MaybeProc(key, K), MaybeProc(val, V)} || {K, V} <- HeadersTks],
maybe_remove_content_type_header(Headers, Method);
maybe_proc_headers(Headers, _, _) ->
%% For headers of old emqx versions, and normal header without placeholders,
%% the Headers are not pre-processed
Headers.
str(Str) when is_list(Str) -> Str; str(Str) when is_list(Str) -> Str;
str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Atom) when is_atom(Atom) -> atom_to_list(Atom);

View File

@ -35,6 +35,7 @@ all() ->
, {group, ipv6http} , {group, ipv6http}
, {group, ipv6https} , {group, ipv6https}
, test_rule_webhook , test_rule_webhook
, test_preproc_headers
]. ].
groups() -> groups() ->
@ -138,6 +139,58 @@ set_special_cfgs() ->
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
test_preproc_headers(_) ->
TestTable = [
{#{<<"Content_TYPE">> => <<"application/JSON">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"application/JSON">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"content-type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"Content_type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"Content_${TypeKey}">> => <<"application/${TypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
}
],
SelectedData1 = #{
<<"ContentTypeKey">> => <<"content-type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"type">>,
<<"TypeVal">> => <<"JSON">>
},
SelectedData2 = #{
<<"ContentTypeKey">> => <<"ConTent_Type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"TYPe">>,
<<"TypeVal">> => <<"JSON">>
},
[begin
ct:pal("test_preproc_headers, input: ~p, method: ~p, selected: ~p", [Input, Method, Selected]),
Headers0 = emqx_web_hook_actions:preproc_and_normalise_headers(Input),
Headers1 = emqx_web_hook_actions:maybe_remove_content_type_header(Headers0, Method),
Result0 = emqx_web_hook_actions:maybe_proc_headers(Headers1, Method, Selected),
Expected1 = case Method =/= post andalso Method =/= put of
true -> maps:remove(<<"content-type">>, Expected);
false -> Expected
end,
?assertEqual(Expected1, maps:from_list(Result0))
end ||
{Input, Expected} <- TestTable,
Selected <- [SelectedData1, SelectedData2],
Method <- [post, put, get, delete]
].
test_rule_webhook(_) -> test_rule_webhook(_) ->
{ok, ServerPid} = http_server:start_link(self(), 9999, []), {ok, ServerPid} = http_server:start_link(self(), 9999, []),
receive {ServerPid, ready} -> ok receive {ServerPid, ready} -> ok

View File

@ -2,6 +2,8 @@
## Enhancements ## Enhancements
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
This is to avoid slowing down the boot if some resources spend long time establishing the connection. This is to avoid slowing down the boot if some resources spend long time establishing the connection.

View File

@ -2,6 +2,8 @@
## 增强 ## 增强
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。