Merge pull request #9254 from terry-xiaoyu/webhook_support_placeholders_in_headers
feat: support to use placeholders as http headers in webhook actions
This commit is contained in:
commit
8260c55530
|
@ -34,6 +34,7 @@
|
||||||
, proc_sql/2
|
, proc_sql/2
|
||||||
, proc_sql_param_str/2
|
, proc_sql_param_str/2
|
||||||
, proc_cql_param_str/2
|
, proc_cql_param_str/2
|
||||||
|
, if_contains_placeholder/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
%% type converting
|
%% type converting
|
||||||
|
@ -175,6 +176,14 @@ proc_param_str(Tokens, Data, Quote) ->
|
||||||
iolist_to_binary(
|
iolist_to_binary(
|
||||||
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
|
proc_tmpl(Tokens, Data, #{return => rawlist, var_trans => Quote})).
|
||||||
|
|
||||||
|
%% return true if the Str contains any placeholder in "${var}" format.
|
||||||
|
-spec(if_contains_placeholder(string() | binary()) -> boolean()).
|
||||||
|
if_contains_placeholder(Str) ->
|
||||||
|
case re:split(Str, ?EX_PLACE_HOLDER, [{return, list}, group, trim]) of
|
||||||
|
[[_]] -> false;
|
||||||
|
_ -> true
|
||||||
|
end.
|
||||||
|
|
||||||
%% backward compatibility for hot upgrading from =< e4.2.1
|
%% backward compatibility for hot upgrading from =< e4.2.1
|
||||||
get_phld_var(Fun, Data) when is_function(Fun) ->
|
get_phld_var(Fun, Data) when is_function(Fun) ->
|
||||||
Fun(Data);
|
Fun(Data);
|
||||||
|
|
|
@ -134,3 +134,18 @@ t_preproc_sql5(_) ->
|
||||||
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
ParamsTokens = emqx_rule_utils:preproc_tmpl(<<"a:${a},b:${b},c:${c},d:${d}">>),
|
||||||
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
|
?assertEqual(<<"a:'1''''2',b:1,c:1.0,d:'{\"d1\":\"someone''s phone\"}'">>,
|
||||||
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
|
emqx_rule_utils:proc_cql_param_str(ParamsTokens, Selected)).
|
||||||
|
|
||||||
|
t_if_contains_placeholder(_) ->
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}">>)),
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a}${b}">>)),
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"${a},${b},${c}">>)),
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a}">>)),
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"a:${a},b:${b}">>)),
|
||||||
|
?assert(emqx_rule_utils:if_contains_placeholder(<<"abc${ab}">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc$">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${a">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"abc${ab">>)),
|
||||||
|
?assertNot(emqx_rule_utils:if_contains_placeholder(<<"a${ab${c${e">>)),
|
||||||
|
ok.
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.1[2-3]">>,
|
{<<"4\\.3\\.1[2-4]">>,
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}],
|
{<<".*">>,[]}],
|
||||||
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
[{"4.3.14",[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
|
@ -46,6 +46,6 @@
|
||||||
{"4.3.11",
|
{"4.3.11",
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
|
||||||
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
|
||||||
{<<"4\\.3\\.1[2-3]">>,
|
{<<"4\\.3\\.1[2-4]">>,
|
||||||
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
|
||||||
{<<".*">>,[]}]}.
|
{<<".*">>,[]}]}.
|
||||||
|
|
|
@ -26,6 +26,13 @@
|
||||||
, on_action_data_to_webserver/2
|
, on_action_data_to_webserver/2
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([ preproc_and_normalise_headers/1
|
||||||
|
, maybe_proc_headers/3
|
||||||
|
, maybe_remove_content_type_header/2
|
||||||
|
]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-export_type([action_fun/0]).
|
-export_type([action_fun/0]).
|
||||||
|
|
||||||
-include_lib("emqx/include/emqx.hrl").
|
-include_lib("emqx/include/emqx.hrl").
|
||||||
|
@ -263,7 +270,8 @@ on_action_data_to_webserver(Selected, _Envs =
|
||||||
metadata := Metadata}) ->
|
metadata := Metadata}) ->
|
||||||
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
|
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
|
||||||
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
|
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
|
||||||
Req = create_req(Method, NPath, Headers, NBody),
|
Headers1 = maybe_proc_headers(Headers, Method, Selected),
|
||||||
|
Req = create_req(Method, NPath, Headers1, NBody),
|
||||||
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
|
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
|
||||||
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
{ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 ->
|
||||||
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
|
?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]),
|
||||||
|
@ -307,8 +315,9 @@ create_req(_, Path, Headers, Body) ->
|
||||||
parse_action_params(Params = #{<<"url">> := URL}) ->
|
parse_action_params(Params = #{<<"url">> := URL}) ->
|
||||||
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
|
{ok, #{path := CommonPath}} = emqx_http_lib:uri_parse(URL),
|
||||||
Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
|
Method = method(maps:get(<<"method">>, Params, <<"POST">>)),
|
||||||
Headers = headers(maps:get(<<"headers">>, Params, #{})),
|
Headers0 = maps:get(<<"headers">>, Params, #{}),
|
||||||
NHeaders = ensure_content_type_header(Headers, Method),
|
Headers1 = preproc_and_normalise_headers(Headers0),
|
||||||
|
NHeaders = maybe_remove_content_type_header(Headers1, Method),
|
||||||
#{method => Method,
|
#{method => Method,
|
||||||
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
|
path => merge_path(CommonPath, maps:get(<<"path">>, Params, <<>>)),
|
||||||
headers => NHeaders,
|
headers => NHeaders,
|
||||||
|
@ -316,9 +325,17 @@ parse_action_params(Params = #{<<"url">> := URL}) ->
|
||||||
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
request_timeout => cuttlefish_duration:parse(str(maps:get(<<"request_timeout">>, Params, <<"5s">>))),
|
||||||
pool => maps:get(<<"pool">>, Params)}.
|
pool => maps:get(<<"pool">>, Params)}.
|
||||||
|
|
||||||
ensure_content_type_header(Headers, Method) when Method =:= post orelse Method =:= put ->
|
%% According to https://www.rfc-editor.org/rfc/rfc7231#section-3.1.1.5, the
|
||||||
|
%% Content-Type HTTP header should be set only for PUT and POST requests.
|
||||||
|
maybe_remove_content_type_header({has_tmpl_token, Headers}, Method) ->
|
||||||
|
{has_tmpl_token, maybe_remove_content_type_header(Headers, Method)};
|
||||||
|
maybe_remove_content_type_header(Headers, Method) when is_map(Headers), (Method =:= post orelse Method =:= put) ->
|
||||||
|
maps:to_list(Headers);
|
||||||
|
maybe_remove_content_type_header(Headers, Method) when is_list(Headers), (Method =:= post orelse Method =:= put) ->
|
||||||
Headers;
|
Headers;
|
||||||
ensure_content_type_header(Headers, _Method) ->
|
maybe_remove_content_type_header(Headers, _Method) when is_map(Headers) ->
|
||||||
|
maps:to_list(maps:remove(<<"content-type">>, Headers));
|
||||||
|
maybe_remove_content_type_header(Headers, _Method) when is_list(Headers) ->
|
||||||
lists:keydelete(<<"content-type">>, 1, Headers).
|
lists:keydelete(<<"content-type">>, 1, Headers).
|
||||||
|
|
||||||
merge_path(CommonPath, <<>>) ->
|
merge_path(CommonPath, <<>>) ->
|
||||||
|
@ -335,8 +352,46 @@ method(POST) when POST == <<"POST">>; POST == <<"post">> -> post;
|
||||||
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
|
method(PUT) when PUT == <<"PUT">>; PUT == <<"put">> -> put;
|
||||||
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
|
method(DEL) when DEL == <<"DELETE">>; DEL == <<"delete">> -> delete.
|
||||||
|
|
||||||
headers(Headers) ->
|
normalize_key(K) ->
|
||||||
emqx_http_lib:normalise_headers(maps:to_list(Headers)).
|
%% see emqx_http_lib:normalise_headers/1 for more info
|
||||||
|
K1 = re:replace(K, "_", "-", [{return, binary}]),
|
||||||
|
string:lowercase(K1).
|
||||||
|
|
||||||
|
preproc_and_normalise_headers(Headers) ->
|
||||||
|
Preproc = fun(Str) -> {tmpl_token, emqx_rule_utils:preproc_tmpl(Str)} end,
|
||||||
|
Res = maps:fold(fun(K, V, {Flag, Acc}) ->
|
||||||
|
case {emqx_rule_utils:if_contains_placeholder(K),
|
||||||
|
emqx_rule_utils:if_contains_placeholder(V)} of
|
||||||
|
{false, false} ->
|
||||||
|
{Flag, Acc#{normalize_key(K) => V}};
|
||||||
|
{false, true} ->
|
||||||
|
{has_tmpl_token, Acc#{normalize_key(K) => Preproc(V)}};
|
||||||
|
{true, false} ->
|
||||||
|
{has_tmpl_token, Acc#{Preproc(K) => V}};
|
||||||
|
{true, true} ->
|
||||||
|
{has_tmpl_token, Acc#{Preproc(K) => Preproc(V)}}
|
||||||
|
end
|
||||||
|
end, {no_token, #{}}, Headers),
|
||||||
|
case Res of
|
||||||
|
{no_token, RHeaders} -> RHeaders;
|
||||||
|
{has_tmpl_token, _} -> Res
|
||||||
|
end.
|
||||||
|
|
||||||
|
maybe_proc_headers({has_tmpl_token, HeadersTks}, Method, Data) ->
|
||||||
|
MaybeProc = fun
|
||||||
|
(key, {tmpl_token, Tokens}) ->
|
||||||
|
normalize_key(emqx_rule_utils:proc_tmpl(Tokens, Data));
|
||||||
|
(val, {tmpl_token, Tokens}) ->
|
||||||
|
emqx_rule_utils:proc_tmpl(Tokens, Data);
|
||||||
|
(_, Str) ->
|
||||||
|
Str
|
||||||
|
end,
|
||||||
|
Headers = [{MaybeProc(key, K), MaybeProc(val, V)} || {K, V} <- HeadersTks],
|
||||||
|
maybe_remove_content_type_header(Headers, Method);
|
||||||
|
maybe_proc_headers(Headers, _, _) ->
|
||||||
|
%% For headers of old emqx versions, and normal header without placeholders,
|
||||||
|
%% the Headers are not pre-processed
|
||||||
|
Headers.
|
||||||
|
|
||||||
str(Str) when is_list(Str) -> Str;
|
str(Str) when is_list(Str) -> Str;
|
||||||
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
str(Atom) when is_atom(Atom) -> atom_to_list(Atom);
|
||||||
|
|
|
@ -35,6 +35,7 @@ all() ->
|
||||||
, {group, ipv6http}
|
, {group, ipv6http}
|
||||||
, {group, ipv6https}
|
, {group, ipv6https}
|
||||||
, test_rule_webhook
|
, test_rule_webhook
|
||||||
|
, test_preproc_headers
|
||||||
].
|
].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
|
@ -138,6 +139,58 @@ set_special_cfgs() ->
|
||||||
%% Test cases
|
%% Test cases
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
test_preproc_headers(_) ->
|
||||||
|
TestTable = [
|
||||||
|
{#{<<"Content_TYPE">> => <<"application/JSON">>, <<"Key">> => <<"Val">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
|
||||||
|
},
|
||||||
|
{#{<<"${ContentTypeKey}">> => <<"application/JSON">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>}
|
||||||
|
},
|
||||||
|
{#{<<"content-type">> => <<"${ContentTypeVal}">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>}
|
||||||
|
},
|
||||||
|
{#{<<"Content_type">> => <<"${ContentTypeVal}">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>}
|
||||||
|
},
|
||||||
|
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
|
||||||
|
},
|
||||||
|
{#{<<"${ContentTypeKey}">> => <<"${ContentTypeVal}">>, <<"Key">> => <<"Val">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
|
||||||
|
},
|
||||||
|
{#{<<"Content_${TypeKey}">> => <<"application/${TypeVal}">>, <<"Key">> => <<"Val">>},
|
||||||
|
#{<<"content-type">> => <<"application/JSON">>, <<"key">> => <<"Val">>}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
SelectedData1 = #{
|
||||||
|
<<"ContentTypeKey">> => <<"content-type">>,
|
||||||
|
<<"ContentTypeVal">> => <<"application/JSON">>,
|
||||||
|
<<"TypeKey">> => <<"type">>,
|
||||||
|
<<"TypeVal">> => <<"JSON">>
|
||||||
|
},
|
||||||
|
SelectedData2 = #{
|
||||||
|
<<"ContentTypeKey">> => <<"ConTent_Type">>,
|
||||||
|
<<"ContentTypeVal">> => <<"application/JSON">>,
|
||||||
|
<<"TypeKey">> => <<"TYPe">>,
|
||||||
|
<<"TypeVal">> => <<"JSON">>
|
||||||
|
},
|
||||||
|
[begin
|
||||||
|
ct:pal("test_preproc_headers, input: ~p, method: ~p, selected: ~p", [Input, Method, Selected]),
|
||||||
|
Headers0 = emqx_web_hook_actions:preproc_and_normalise_headers(Input),
|
||||||
|
Headers1 = emqx_web_hook_actions:maybe_remove_content_type_header(Headers0, Method),
|
||||||
|
Result0 = emqx_web_hook_actions:maybe_proc_headers(Headers1, Method, Selected),
|
||||||
|
Expected1 = case Method =/= post andalso Method =/= put of
|
||||||
|
true -> maps:remove(<<"content-type">>, Expected);
|
||||||
|
false -> Expected
|
||||||
|
end,
|
||||||
|
?assertEqual(Expected1, maps:from_list(Result0))
|
||||||
|
end ||
|
||||||
|
{Input, Expected} <- TestTable,
|
||||||
|
Selected <- [SelectedData1, SelectedData2],
|
||||||
|
Method <- [post, put, get, delete]
|
||||||
|
].
|
||||||
|
|
||||||
test_rule_webhook(_) ->
|
test_rule_webhook(_) ->
|
||||||
{ok, ServerPid} = http_server:start_link(self(), 9999, []),
|
{ok, ServerPid} = http_server:start_link(self(), 9999, []),
|
||||||
receive {ServerPid, ready} -> ok
|
receive {ServerPid, ready} -> ok
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
## Enhancements
|
## Enhancements
|
||||||
|
|
||||||
|
- Support to use placeholders like `${var}` in the HTTP `Headers` of rule-engine's Webhook actions [#9239](https://github.com/emqx/emqx/pull/9239).
|
||||||
|
|
||||||
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
- Asynchronously refresh the resources and rules during emqx boot-up [#9199](https://github.com/emqx/emqx/pull/9199).
|
||||||
This is to avoid slowing down the boot if some resources spend long time establishing the connection.
|
This is to avoid slowing down the boot if some resources spend long time establishing the connection.
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,8 @@
|
||||||
|
|
||||||
## 增强
|
## 增强
|
||||||
|
|
||||||
|
- 支持在规则引擎的 Webhook 动作的 HTTP Headers 里使用 `${var}` 格式的占位符 [#9239](https://github.com/emqx/emqx/pull/9239)。
|
||||||
|
|
||||||
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
- 在 emqx 启动时,异步地刷新资源和规则 [#9199](https://github.com/emqx/emqx/pull/9199)。
|
||||||
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
|
这个改动是为了避免因为一些资源连接建立过慢,而导致启动时间过长。
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue