Merge pull request #9239 from emqx/webhook_support_placeholders_in_headers

feat: support to use placeholders as http headers in webhook actions
This commit is contained in:
Xinyu Liu 2022-10-28 09:01:29 +08:00 committed by GitHub
commit eeb87fd253
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 145 additions and 9 deletions

View File

@ -34,6 +34,7 @@
, proc_sql/2 , proc_sql/2
, proc_sql_param_str/2 , proc_sql_param_str/2
, proc_cql_param_str/2 , proc_cql_param_str/2
, if_contains_placeholder/1
]). ]).
%% type converting %% type converting
@ -175,6 +176,14 @@ proc_param_str(Tokens, Data, Quote) ->
iolist_to_binary( iolist_to_binary(
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})). proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
%% return true if the Str contains any placeholder in "${var}" format.
-spec(if_contains_placeholder(string() | binary()) -> boolean()).
if_contains_placeholder(Str) ->
case re:split(Str, ?EX_PLACE_HOLDER, [{return, list}, group, trim]) of
[[_]] -> false;
_ -> true
end.
%% backward compatibility for hot upgrading from =< e4.2.1 %% backward compatibility for hot upgrading from =< e4.2.1
get_phld_var(Fun, Data) when is_function(Fun) -> get_phld_var(Fun, Data) when is_function(Fun) ->
Fun(Data); Fun(Data);

View File

@ -134,3 +134,18 @@ t_preproc_sql5(_) ->
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>), ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>, ?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)). emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
t_if_contains_placeholder(_) ->
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
ok.

View File

@ -22,7 +22,7 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}], {<<".*">>,[]}],
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
@ -46,6 +46,6 @@
{"4.3.11", {"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.1[2-3]">>, {<<"4\\.3\\.1[2-4]">>,
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}. {<<".*">>,[]}]}.

View File

@ -26,6 +26,13 @@
, on_action_data_to_webserver/2 , on_action_data_to_webserver/2
]). ]).
-ifdef(TEST).
-export([ preproc_and_normalise_headers/1
, maybe_proc_headers/3
, maybe_remove_content_type_header/2
]).
-endif.
-export_type([action_fun/0]). -export_type([action_fun/0]).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -263,7 +270,8 @@ on_action_data_to_webserver(Selected, _Envs =
metadata := Metadata}) -> metadata := Metadata}) ->
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody), Headers1 = maybe_proc_headers(Headers, Method, Selected),
Req = create_req(Method, NPath, Headers1, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]), ?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
@ -307,8 +315,9 @@ create_req(_, Path, Headers, Body) ->
parse_action_params(Params = #{<<"url">> := URL}) -> parse_action_params(Params = #{<<"url">> := URL}) ->
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL), {ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
Method = method(maps:get(<<"method">>, Params, <<"POST">>)), Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
Headers = headers(maps:get(<<"headers">>, Params, #{})), Headers0 = maps:get(<<"headers">>, Params, #{}),
NHeaders = ensure_content_type_header(Headers, Method), Headers1 = preproc_and_normalise_headers(Headers0),
NHeaders = maybe_remove_content_type_header(Headers1, Method),
#{method => Method, #{method => Method,
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)), path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
headers => NHeaders, headers => NHeaders,
@ -316,9 +325,17 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))), request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
pool => maps:get(<<"pool">>, Params)}. pool => maps:get(<<"pool">>, Params)}.
ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put -> %% According to https://www.rfc-editor.org/rfc/rfc7231#section-3.1.1.5, the
%% Content-Type HTTP header should be set only for PUT and POST requests.
maybe_remove_content_type_header({has_tmpl_token, Headers}, Method) ->
{has_tmpl_token, maybe_remove_content_type_header(Headers, Method)};
maybe_remove_content_type_header(Headers, Method) when is_map(Headers), (Method =:= post orelse Method =:= put) ->
maps:to_list(Headers);
maybe_remove_content_type_header(Headers, Method) when is_list(Headers), (Method =:= post orelse Method =:= put) ->
Headers; Headers;
ensure_content_type_header(Headers, _Method) -> maybe_remove_content_type_header(Headers, _Method) when is_map(Headers) ->
maps:to_list(maps:remove(<<"content-type">>, Headers));
maybe_remove_content_type_header(Headers, _Method) when is_list(Headers) ->
lists:keydelete(<<"content-type">>, 1, Headers). lists:keydelete(<<"content-type">>, 1, Headers).
merge_path(CommonPath, <<>>) -> merge_path(CommonPath, <<>>) ->
@ -335,8 +352,46 @@ method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put; method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete. method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
headers(Headers) -> normalize_key(K) ->
emqx_http_lib:normalise_headers(maps:to_list(Headers)). %% see emqx_http_lib:normalise_headers/1 for more info
K1 = re:replace(K, "_", "-", [{return, binary}]),
string:lowercase(K1).
preproc_and_normalise_headers(Headers) ->
Preproc = fun(Str) -> {tmpl_token, emqx_rule_utils:preproc_tmpl(Str)} end,
Res = maps:fold(fun(K, V, {Flag, Acc}) ->
case {emqx_rule_utils:if_contains_placeholder(K),
emqx_rule_utils:if_contains_placeholder(V)} of
{false, false} ->
{Flag, Acc#{normalize_key(K) => V}};
{false, true} ->
{has_tmpl_token, Acc#{normalize_key(K) => Preproc(V)}};
{true, false} ->
{has_tmpl_token, Acc#{Preproc(K) => V}};
{true, true} ->
{has_tmpl_token, Acc#{Preproc(K) => Preproc(V)}}
end
end, {no_token, #{}}, Headers),
case Res of
{no_token, RHeaders} -> RHeaders;
{has_tmpl_token, _} -> Res
end.
maybe_proc_headers({has_tmpl_token, HeadersTks}, Method, Data) ->
MaybeProc = fun
(key, {tmpl_token, Tokens}) ->
normalize_key(emqx_rule_utils:proc_tmpl(Tokens, Data));
(val, {tmpl_token, Tokens}) ->
emqx_rule_utils:proc_tmpl(Tokens, Data);
(_, Str) ->
Str
end,
Headers = [{MaybeProc(key, K), MaybeProc(val, V)} || {K, V} <- HeadersTks],
maybe_remove_content_type_header(Headers, Method);
maybe_proc_headers(Headers, _, _) ->
%% For headers of old emqx versions, and normal header without placeholders,
%% the Headers are not pre-processed
Headers.
str(Str) when is_list(Str) -> Str; str(Str) when is_list(Str) -> Str;
str(Atom) when is_atom(Atom) -> atom_to_list(Atom); str(Atom) when is_atom(Atom) -> atom_to_list(Atom);

View File

@ -35,6 +35,7 @@ all() ->
, {group, ipv6http} , {group, ipv6http}
, {group, ipv6https} , {group, ipv6https}
, test_rule_webhook , test_rule_webhook
, test_preproc_headers
]. ].
groups() -> groups() ->
@ -138,6 +139,58 @@ set_special_cfgs() ->
%% Test cases %% Test cases
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
test_preproc_headers(_) ->
TestTable = [
{#{<<"Content_TYPE">> => <<"application/JSON">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"application/JSON">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"content-type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"Content_type">> => <<"${ContentTypeVal}">>},
#{<<"content-type">> => <<"application/JSON">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
},
{#{<<"Content_${TypeKey}">> => <<"application/${TypeVal}">>, <<"Key">> => <<"Val">>},
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
}
],
SelectedData1 = #{
<<"ContentTypeKey">> => <<"content-type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"type">>,
<<"TypeVal">> => <<"JSON">>
},
SelectedData2 = #{
<<"ContentTypeKey">> => <<"ConTent_Type">>,
<<"ContentTypeVal">> => <<"application/JSON">>,
<<"TypeKey">> => <<"TYPe">>,
<<"TypeVal">> => <<"JSON">>
},
[begin
ct:pal("test_preproc_headers, input: ~p, method: ~p, selected: ~p", [Input, Method, Selected]),
Headers0 = emqx_web_hook_actions:preproc_and_normalise_headers(Input),
Headers1 = emqx_web_hook_actions:maybe_remove_content_type_header(Headers0, Method),
Result0 = emqx_web_hook_actions:maybe_proc_headers(Headers1, Method, Selected),
Expected1 = case Method =/= post andalso Method =/= put of
true -> maps:remove(<<"content-type">>, Expected);
false -> Expected
end,
?assertEqual(Expected1, maps:from_list(Result0))
end ||
{Input, Expected} <- TestTable,
Selected <- [SelectedData1, SelectedData2],
Method <- [post, put, get, delete]
].
test_rule_webhook(_) -> test_rule_webhook(_) ->
{ok, ServerPid} = http_server:start_link(self(), 9999, []), {ok, ServerPid} = http_server:start_link(self(), 9999, []),
receive {ServerPid, ready} -> ok receive {ServerPid, ready} -> ok

View File

@ -2,6 +2,8 @@
## Enhancements ## Enhancements
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199). - Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
This is to avoid slowing down the boot if some resources spend long time establishing the connection. This is to avoid slowing down the boot if some resources spend long time establishing the connection.

View File

@ -2,6 +2,8 @@
## 增强 ## 增强
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。 - 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。 这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。