Merge pull request #8160 from gsychev/webhook_json_fix

fix(web_hook): proper json encoding
This commit is contained in:
Zaiming (Stone) Shi 2022-06-16 20:11:28 +01:00 committed by GitHub
commit b4cdfcb709
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 131 additions and 45 deletions

View File

@ -5,18 +5,21 @@
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{add_module,emqx_rule_date},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
@ -159,18 +162,21 @@
[{"4.3.10",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_maps,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date}]},
{"4.3.8",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},

View File

@ -509,7 +509,7 @@ refresh_resource_status() ->
fun(#resource{id = ResId, type = ResType}) ->
case emqx_rule_registry:find_resource_type(ResType) of
{ok, #resource_type{on_status = {Mod, OnStatus}}} ->
_ = fetch_resource_status(Mod, OnStatus, ResId);
fetch_resource_status(Mod, OnStatus, ResId);
_ -> ok
end
end, emqx_rule_registry:get_resources()).

View File

@ -51,13 +51,9 @@ apply_rules([], _Input) ->
apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input);
apply_rules([Rule|More], Input) ->
apply_rule_discard_result(Rule, Input),
apply_rule(Rule, Input),
apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) ->
_ = apply_rule(Rule, Input),
ok.
apply_rule(Rule = #rule{id = RuleID}, Input) ->
clear_rule_payload(),
ok = emqx_rule_metrics:inc_rules_matched(RuleID),

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.11"}, % strict semver, bump manually!
{vsn, "4.3.12"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -12,10 +12,20 @@
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}},
@ -28,8 +38,18 @@
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.

View File

@ -344,15 +344,13 @@ send_http_request(ClientID, Params) ->
ok
end.
parse_rule(Rules) ->
parse_rule(Rules, []).
parse_rule([], Acc) ->
lists:reverse(Acc);
parse_rule([{Rule, Conf} | Rules], Acc) ->
parse_rule([]) ->
[];
parse_rule([{Rule, Conf} | Rules]) ->
Params = emqx_json:decode(iolist_to_binary(Conf)),
Action = proplists:get_value(<<"action">>, Params),
Filter = proplists:get_value(<<"topic">>, Params),
parse_rule(Rules, [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | Acc]).
[{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | parse_rule(Rules)].
with_filter(Fun, _, undefined) ->
Fun(), ok;
@ -389,4 +387,3 @@ stringfy(Term) ->
maybe(undefined) -> null;
maybe(Str) -> Str.

View File

@ -258,7 +258,7 @@ on_action_data_to_webserver(Selected, _Envs =
'Pool' := Pool,
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
NBody = format_msg(BodyTokens, Selected),
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
@ -280,7 +280,12 @@ on_action_data_to_webserver(Selected, _Envs =
format_msg([], Data) ->
emqx_json:encode(Data);
format_msg(Tokens, Data) ->
emqx_rule_utils:proc_tmpl(Tokens, Data).
emqx_rule_utils:proc_tmpl(Tokens, Data).
clear_user_property_header(#{headers := #{properties := #{'User-Property' := _} = P} = H} = S) ->
S#{headers => H#{properties => P#{'User-Property' => []}}};
clear_user_property_header(S) ->
S.
%%------------------------------------------------------------------------------
%% Internal functions
@ -365,9 +370,7 @@ get_ssl_opts(Opts, ResId) ->
test_http_connect(Conf) ->
Url = fun() -> maps:get(<<"url">>, Conf) end,
try
emqx_rule_utils:http_connectivity(Url())
of
try emqx_rule_utils:http_connectivity(Url()) of
ok -> true;
{error, _Reason} ->
?LOG(error, "check http_connectivity failed: ~p", [Url()]),

View File

@ -19,9 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
-define(ACTION(Name), #{<<"action">> := Name}).
@ -35,7 +34,7 @@ all() ->
, {group, https}
, {group, ipv6http}
, {group, ipv6https}
, {group, all}
, test_rule_webhook
].
groups() ->
@ -44,12 +43,18 @@ groups() ->
, {https, [sequence], Cases}
, {ipv6http, [sequence], Cases}
, {ipv6https, [sequence], Cases}
, {all, [sequence], emqx_ct:all(?MODULE)}
].
start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
start_apps() ->
[application:load(App) || App <- apps()],
emqx_ct_helpers:start_apps(apps()).
start_apps(F) ->
[application:load(App) || App <- apps()],
emqx_ct_helpers:start_apps(apps(), F).
init_per_group(rules, Config) -> Config;
init_per_group(Name, Config) ->
net_kernel:start(['test@127.0.0.1', longnames]),
application:ensure_all_started(emqx_management),
set_special_cfgs(),
BasePort =
@ -74,6 +79,20 @@ init_per_group(Name, Config) ->
end,
[{base_port, BasePort}, {transport_opts, Opts} | Config].
init_per_testcase(test_rule_webhook, Config) ->
net_kernel:start(['test@127.0.0.1', longnames]),
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
Handler = fun(_) ->
application:set_env(emqx_web_hook, rules, []),
application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999/"),
application:set_env(emqx_web_hook, ssl, false),
application:set_env(emqx_web_hook, ssloptions, [])
end,
ok = start_apps(Handler),
Config;
init_per_testcase(_, Config) -> Config.
end_per_group(_Name, Config) ->
emqx_ct_helpers:stop_apps(apps()),
Config.
@ -119,6 +138,51 @@ set_special_cfgs() ->
%% Test cases
%%--------------------------------------------------------------------
test_rule_webhook(_) ->
{ok, ServerPid} = http_server:start_link(self(), 9999, []),
receive {ServerPid, ready} -> ok
after 1000 -> error(timeout)
end,
ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => web_hook,
config => #{<<"url">> => "http://127.0.0.1:9999/"},
description => <<"For testing">>}),
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
#{rawsql => "select * from \"t1\"",
actions => [#{name => 'data_to_webserver',
args => #{<<"$resource">> => ResId}}],
type => web_hook,
description => <<"For testing">>
}),
Properties = #{'User-Property' => [{<<"user_property_key">>, <<"user_property_value">>}]},
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]),
{ok, Client} = emqtt:start_link([ {clientid, ClientId}
, {proto_ver, v5}
, {keepalive, 60}
]),
{ok, _} = emqtt:connect(Client),
{ok, _} = emqtt:publish(Client, <<"t1">>, Properties, <<"Payload...">>, [{qos, 2}]),
Res = receive {http_server, {Any, _Bool}, _Header} -> {ok, Any}
after 100 -> error
end,
?assertMatch({ok, _}, Res),
{ok, Body} = Res,
?assertNotEqual([], binary:matches(Body, <<"User-Property">>)),
?assertNotEqual([], binary:matches(Body, <<"user_property_key">>)),
?assertNotEqual([], binary:matches(Body, <<"user_property_value">>)),
emqtt:stop(Client),
http_server:stop(ServerPid),
emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId),
ok.
test_full_flow(Config) ->
[_|_] = Opts = proplists:get_value(transport_opts, Config),
BasePort = proplists:get_value(base_port, Config),
@ -128,7 +192,7 @@ test_full_flow(Config) ->
after 1000 -> error(timeout)
end,
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]),
{ok, C} = emqtt:start_link([ {clientid, ClientId}
, {proto_ver, v5}
, {keepalive, 60}
@ -174,10 +238,10 @@ validate_params_and_headers(ClientState, ClientId) ->
end
after
5000 ->
case ClientState =:= undefined of
true -> error("client_was_never_connected");
false -> error("terminate_action_is_not_received_in_time")
end
case ClientState =:= undefined of
true -> error("client_was_never_connected");
false -> error("terminate_action_is_not_received_in_time")
end
end.
t_check_hooked(_) ->

View File

@ -81,7 +81,7 @@ stop_https() ->
compile_router(Parent) ->
{ok, _} = application:ensure_all_started(cowboy),
cowboy_router:compile([
{'_', [{"/", ?MODULE, #{parent => Parent}}]}
{'_', [{'_', ?MODULE, #{parent => Parent}}]}
]).
init(Req, #{parent := Parent} = State) ->