feat: add apply rule API, clientid/ruleid tracing for rule and connector
This commit adds: * Support for forwarding the rule id and client id to the connector so that events such as template rendered successfully can be traced. * HTTP API for for applying/activating a rule with the given context
This commit is contained in:
parent
59a442cdb5
commit
ef705c2285
|
@ -135,14 +135,22 @@ running() ->
|
|||
lists:foldl(fun filter_traces/2, [], emqx_logger:get_log_handlers(started)).
|
||||
|
||||
-spec filter_ruleid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
|
||||
filter_ruleid(#{meta := Meta = #{ruleid := RuleId}} = Log, {MatchId, _Name}) ->
|
||||
filter_ret(RuleId =:= MatchId andalso is_trace(Meta), Log);
|
||||
filter_ruleid(#{meta := Meta = #{rule_id := RuleId}} = Log, {MatchId, _Name}) ->
|
||||
RuleIDs = maps:get(rule_ids, Meta, #{}),
|
||||
IsMatch = (RuleId =:= MatchId) orelse maps:get(MatchId, RuleIDs, false),
|
||||
filter_ret(IsMatch andalso is_trace(Meta), Log);
|
||||
filter_ruleid(#{meta := Meta = #{rule_ids := RuleIDs}} = Log, {MatchId, _Name}) ->
|
||||
filter_ret(maps:get(MatchId, RuleIDs, false) andalso is_trace(Meta), Log);
|
||||
filter_ruleid(_Log, _ExpectId) ->
|
||||
stop.
|
||||
|
||||
-spec filter_clientid(logger:log_event(), {binary(), atom()}) -> logger:log_event() | stop.
|
||||
filter_clientid(#{meta := Meta = #{clientid := ClientId}} = Log, {MatchId, _Name}) ->
|
||||
filter_ret(ClientId =:= MatchId andalso is_trace(Meta), Log);
|
||||
ClientIDs = maps:get(client_ids, Meta, #{}),
|
||||
IsMatch = (ClientId =:= MatchId) orelse maps:get(MatchId, ClientIDs, false),
|
||||
filter_ret(IsMatch andalso is_trace(Meta), Log);
|
||||
filter_clientid(#{meta := Meta = #{client_ids := ClientIDs}} = Log, {MatchId, _Name}) ->
|
||||
filter_ret(maps:get(MatchId, ClientIDs, false) andalso is_trace(Meta), Log);
|
||||
filter_clientid(_Log, _ExpectId) ->
|
||||
stop.
|
||||
|
||||
|
|
|
@ -661,13 +661,22 @@ process_request_and_action(Request, ActionState, Msg) ->
|
|||
),
|
||||
BodyTemplate = maps:get(body, ActionState),
|
||||
Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg),
|
||||
#{
|
||||
RenderResult = #{
|
||||
method => Method,
|
||||
path => Path,
|
||||
body => Body,
|
||||
headers => Headers,
|
||||
request_timeout => maps:get(request_timeout, ActionState)
|
||||
}.
|
||||
},
|
||||
?TRACE(
|
||||
"QUERY_RENDER",
|
||||
"http_connector_successfully_rendered_request",
|
||||
#{
|
||||
request => Request,
|
||||
render_result => RenderResult
|
||||
}
|
||||
),
|
||||
RenderResult.
|
||||
|
||||
merge_proplist(Proplist1, Proplist2) ->
|
||||
lists:foldl(
|
||||
|
|
|
@ -30,8 +30,8 @@
|
|||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
-include_lib("emqx/include/asserts.hrl").
|
||||
|
||||
-define(BRIDGE_TYPE, <<"webhook">>).
|
||||
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
|
||||
-define(BRIDGE_TYPE, emqx_bridge_http_test_lib:bridge_type()).
|
||||
-define(BRIDGE_NAME, emqx_bridge_http_test_lib:bridge_name()).
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
@ -73,21 +73,10 @@ suite() ->
|
|||
|
||||
init_per_testcase(t_bad_bridge_config, Config) ->
|
||||
Config;
|
||||
init_per_testcase(t_send_async_connection_timeout, Config) ->
|
||||
HTTPPath = <<"/path">>,
|
||||
ServerSSLOpts = false,
|
||||
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
|
||||
_Port = random, HTTPPath, ServerSSLOpts
|
||||
),
|
||||
ResponseDelayMS = 500,
|
||||
ok = emqx_bridge_http_connector_test_server:set_handler(
|
||||
success_http_handler(#{response_delay => ResponseDelayMS})
|
||||
),
|
||||
[
|
||||
{http_server, #{port => HTTPPort, path => HTTPPath}},
|
||||
{response_delay_ms, ResponseDelayMS}
|
||||
| Config
|
||||
];
|
||||
init_per_testcase(Case, Config) when
|
||||
Case =:= t_send_async_connection_timeout orelse Case =:= t_send_get_trace_messages
|
||||
->
|
||||
emqx_bridge_http_test_lib:init_http_success_server(Config);
|
||||
init_per_testcase(t_path_not_found, Config) ->
|
||||
HTTPPath = <<"/nonexisting/path">>,
|
||||
ServerSSLOpts = false,
|
||||
|
@ -115,7 +104,9 @@ init_per_testcase(t_bridge_probes_header_atoms, Config) ->
|
|||
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
|
||||
_Port = random, HTTPPath, ServerSSLOpts
|
||||
),
|
||||
ok = emqx_bridge_http_connector_test_server:set_handler(success_http_handler()),
|
||||
ok = emqx_bridge_http_connector_test_server:set_handler(
|
||||
emqx_bridge_http_test_lib:success_http_handler()
|
||||
),
|
||||
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Server = start_http_server(#{response_delay_ms => 0}),
|
||||
|
@ -126,7 +117,8 @@ end_per_testcase(TestCase, _Config) when
|
|||
TestCase =:= t_too_many_requests;
|
||||
TestCase =:= t_rule_action_expired;
|
||||
TestCase =:= t_bridge_probes_header_atoms;
|
||||
TestCase =:= t_send_async_connection_timeout
|
||||
TestCase =:= t_send_async_connection_timeout;
|
||||
TestCase =:= t_send_get_trace_messages
|
||||
->
|
||||
ok = emqx_bridge_http_connector_test_server:stop(),
|
||||
persistent_term:erase({?MODULE, times_called}),
|
||||
|
@ -250,115 +242,8 @@ get_metrics(Name) ->
|
|||
Type = <<"http">>,
|
||||
emqx_bridge:get_metrics(Type, Name).
|
||||
|
||||
bridge_async_config(#{port := Port} = Config) ->
|
||||
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
||||
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
||||
Host = maps:get(host, Config, "localhost"),
|
||||
Path = maps:get(path, Config, ""),
|
||||
PoolSize = maps:get(pool_size, Config, 1),
|
||||
QueryMode = maps:get(query_mode, Config, "async"),
|
||||
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
|
||||
RequestTimeout = maps:get(request_timeout, Config, "10s"),
|
||||
ResumeInterval = maps:get(resume_interval, Config, "1s"),
|
||||
HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
|
||||
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
|
||||
LocalTopic =
|
||||
case maps:find(local_topic, Config) of
|
||||
{ok, LT} ->
|
||||
lists:flatten(["local_topic = \"", LT, "\""]);
|
||||
error ->
|
||||
""
|
||||
end,
|
||||
ConfigString = io_lib:format(
|
||||
"bridges.~s.~s {\n"
|
||||
" url = \"http://~s:~p~s\"\n"
|
||||
" connect_timeout = \"~p\"\n"
|
||||
" enable = true\n"
|
||||
%% local_topic
|
||||
" ~s\n"
|
||||
" enable_pipelining = 100\n"
|
||||
" max_retries = 2\n"
|
||||
" method = \"post\"\n"
|
||||
" pool_size = ~p\n"
|
||||
" pool_type = \"random\"\n"
|
||||
" request_timeout = \"~s\"\n"
|
||||
" body = \"${id}\"\n"
|
||||
" resource_opts {\n"
|
||||
" inflight_window = 100\n"
|
||||
" health_check_interval = \"~s\"\n"
|
||||
" max_buffer_bytes = \"1GB\"\n"
|
||||
" query_mode = \"~s\"\n"
|
||||
" request_ttl = \"~p\"\n"
|
||||
" resume_interval = \"~s\"\n"
|
||||
" start_after_created = \"true\"\n"
|
||||
" start_timeout = \"5s\"\n"
|
||||
" worker_pool_size = \"1\"\n"
|
||||
" }\n"
|
||||
" ssl {\n"
|
||||
" enable = false\n"
|
||||
" }\n"
|
||||
"}\n",
|
||||
[
|
||||
Type,
|
||||
Name,
|
||||
Host,
|
||||
Port,
|
||||
Path,
|
||||
ConnectTimeout,
|
||||
LocalTopic,
|
||||
PoolSize,
|
||||
RequestTimeout,
|
||||
HealthCheckInterval,
|
||||
QueryMode,
|
||||
ResourceRequestTTL,
|
||||
ResumeInterval
|
||||
]
|
||||
),
|
||||
ct:pal(ConfigString),
|
||||
parse_and_check(ConfigString, Type, Name).
|
||||
|
||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
||||
RetConfig.
|
||||
|
||||
make_bridge(Config) ->
|
||||
Type = ?BRIDGE_TYPE,
|
||||
Name = ?BRIDGE_NAME,
|
||||
BridgeConfig = bridge_async_config(Config#{
|
||||
name => Name,
|
||||
type => Type
|
||||
}),
|
||||
{ok, _} = emqx_bridge:create(
|
||||
Type,
|
||||
Name,
|
||||
BridgeConfig
|
||||
),
|
||||
emqx_bridge_resource:bridge_id(Type, Name).
|
||||
|
||||
success_http_handler() ->
|
||||
success_http_handler(#{response_delay => 0}).
|
||||
|
||||
success_http_handler(Opts) ->
|
||||
ResponseDelay = maps:get(response_delay, Opts, 0),
|
||||
TestPid = self(),
|
||||
fun(Req0, State) ->
|
||||
{ok, Body, Req} = cowboy_req:read_body(Req0),
|
||||
Headers = cowboy_req:headers(Req),
|
||||
ct:pal("http request received: ~p", [
|
||||
#{body => Body, headers => Headers, response_delay => ResponseDelay}
|
||||
]),
|
||||
ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
|
||||
TestPid ! {http, Headers, Body},
|
||||
Rep = cowboy_req:reply(
|
||||
200,
|
||||
#{<<"content-type">> => <<"text/plain">>},
|
||||
<<"hello">>,
|
||||
Req
|
||||
),
|
||||
{ok, Rep, State}
|
||||
end.
|
||||
emqx_bridge_http_test_lib:make_bridge(Config).
|
||||
|
||||
not_found_http_handler() ->
|
||||
TestPid = self(),
|
||||
|
@ -452,6 +337,103 @@ t_send_async_connection_timeout(Config) ->
|
|||
receive_request_notifications(MessageIDs, ResponseDelayMS, []),
|
||||
ok.
|
||||
|
||||
t_send_get_trace_messages(Config) ->
|
||||
ResponseDelayMS = ?config(response_delay_ms, Config),
|
||||
#{port := Port, path := Path} = ?config(http_server, Config),
|
||||
BridgeID = make_bridge(#{
|
||||
port => Port,
|
||||
path => Path,
|
||||
pool_size => 1,
|
||||
query_mode => "async",
|
||||
connect_timeout => integer_to_list(ResponseDelayMS * 2) ++ "ms",
|
||||
request_timeout => "10s",
|
||||
resume_interval => "200ms",
|
||||
health_check_interval => "200ms",
|
||||
resource_request_ttl => "infinity"
|
||||
}),
|
||||
RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
|
||||
SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
|
||||
{ok, #{<<"id">> := RuleId}} =
|
||||
emqx_bridge_testlib:create_rule_and_action_http(
|
||||
?BRIDGE_TYPE,
|
||||
RuleTopic,
|
||||
Config,
|
||||
#{sql => SQL}
|
||||
),
|
||||
%% ===================================
|
||||
%% Create trace for RuleId
|
||||
%% ===================================
|
||||
Now = erlang:system_time(second) - 10,
|
||||
Start = Now,
|
||||
End = Now + 60,
|
||||
TraceName = atom_to_binary(?FUNCTION_NAME),
|
||||
Trace = #{
|
||||
name => TraceName,
|
||||
type => ruleid,
|
||||
ruleid => RuleId,
|
||||
start_at => Start,
|
||||
end_at => End
|
||||
},
|
||||
emqx_trace_SUITE:reload(),
|
||||
ok = emqx_trace:clear(),
|
||||
{ok, _} = emqx_trace:create(Trace),
|
||||
%% ===================================
|
||||
|
||||
ResourceId = emqx_bridge_resource:resource_id(BridgeID),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
?assertMatch({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
?assertEqual(<<>>, read_rule_trace_file(TraceName, Now))
|
||||
),
|
||||
Msg = emqx_message:make(RuleTopic, <<"{\"id\": 1}">>),
|
||||
emqx:publish(Msg),
|
||||
?retry(
|
||||
_Interval = 500,
|
||||
_NAttempts = 20,
|
||||
?assertMatch(
|
||||
#{
|
||||
counters := #{
|
||||
'matched' := 1,
|
||||
'actions.failed' := 0,
|
||||
'actions.failed.unknown' := 0,
|
||||
'actions.success' := 1,
|
||||
'actions.total' := 1
|
||||
}
|
||||
},
|
||||
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
|
||||
)
|
||||
),
|
||||
|
||||
ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid),
|
||||
{ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, Now)),
|
||||
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
begin
|
||||
Bin = read_rule_trace_file(TraceName, Now),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
|
||||
end
|
||||
),
|
||||
emqx_trace:delete(TraceName),
|
||||
ok.
|
||||
|
||||
read_rule_trace_file(TraceName, From) ->
|
||||
emqx_trace:check(),
|
||||
ok = emqx_trace_handler_SUITE:filesync(TraceName, ruleid),
|
||||
{ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)),
|
||||
Bin.
|
||||
|
||||
t_async_free_retries(Config) ->
|
||||
#{port := Port} = ?config(http_server, Config),
|
||||
_BridgeID = make_bridge(#{
|
||||
|
@ -518,7 +500,7 @@ t_async_common_retries(Config) ->
|
|||
ok.
|
||||
|
||||
t_bad_bridge_config(_Config) ->
|
||||
BridgeConfig = bridge_async_config(#{port => 12345}),
|
||||
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{port => 12345}),
|
||||
?assertMatch(
|
||||
{ok,
|
||||
{{_, 201, _}, _Headers, #{
|
||||
|
@ -540,7 +522,7 @@ t_bad_bridge_config(_Config) ->
|
|||
|
||||
t_start_stop(Config) ->
|
||||
#{port := Port} = ?config(http_server, Config),
|
||||
BridgeConfig = bridge_async_config(#{
|
||||
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||
type => ?BRIDGE_TYPE,
|
||||
name => ?BRIDGE_NAME,
|
||||
port => Port
|
||||
|
@ -554,7 +536,7 @@ t_path_not_found(Config) ->
|
|||
begin
|
||||
#{port := Port, path := Path} = ?config(http_server, Config),
|
||||
MQTTTopic = <<"t/webhook">>,
|
||||
BridgeConfig = bridge_async_config(#{
|
||||
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||
type => ?BRIDGE_TYPE,
|
||||
name => ?BRIDGE_NAME,
|
||||
local_topic => MQTTTopic,
|
||||
|
@ -593,7 +575,7 @@ t_too_many_requests(Config) ->
|
|||
begin
|
||||
#{port := Port, path := Path} = ?config(http_server, Config),
|
||||
MQTTTopic = <<"t/webhook">>,
|
||||
BridgeConfig = bridge_async_config(#{
|
||||
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||
type => ?BRIDGE_TYPE,
|
||||
name => ?BRIDGE_NAME,
|
||||
local_topic => MQTTTopic,
|
||||
|
@ -633,7 +615,7 @@ t_rule_action_expired(Config) ->
|
|||
?check_trace(
|
||||
begin
|
||||
RuleTopic = <<"t/webhook/rule">>,
|
||||
BridgeConfig = bridge_async_config(#{
|
||||
BridgeConfig = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||
type => ?BRIDGE_TYPE,
|
||||
name => ?BRIDGE_NAME,
|
||||
host => "non.existent.host",
|
||||
|
@ -689,7 +671,7 @@ t_bridge_probes_header_atoms(Config) ->
|
|||
?check_trace(
|
||||
begin
|
||||
LocalTopic = <<"t/local/topic">>,
|
||||
BridgeConfig0 = bridge_async_config(#{
|
||||
BridgeConfig0 = emqx_bridge_http_test_lib:bridge_async_config(#{
|
||||
type => ?BRIDGE_TYPE,
|
||||
name => ?BRIDGE_NAME,
|
||||
port => Port,
|
||||
|
|
|
@ -0,0 +1,161 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2020-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_http_test_lib).
|
||||
|
||||
-export([
|
||||
bridge_type/0,
|
||||
bridge_name/0,
|
||||
make_bridge/1,
|
||||
bridge_async_config/1,
|
||||
init_http_success_server/1,
|
||||
success_http_handler/0
|
||||
]).
|
||||
|
||||
-define(BRIDGE_TYPE, bridge_type()).
|
||||
-define(BRIDGE_NAME, bridge_name()).
|
||||
|
||||
bridge_type() ->
|
||||
<<"webhook">>.
|
||||
|
||||
bridge_name() ->
|
||||
atom_to_binary(?MODULE).
|
||||
|
||||
make_bridge(Config) ->
|
||||
Type = ?BRIDGE_TYPE,
|
||||
Name = ?BRIDGE_NAME,
|
||||
BridgeConfig = bridge_async_config(Config#{
|
||||
name => Name,
|
||||
type => Type
|
||||
}),
|
||||
{ok, _} = emqx_bridge:create(
|
||||
Type,
|
||||
Name,
|
||||
BridgeConfig
|
||||
),
|
||||
emqx_bridge_resource:bridge_id(Type, Name).
|
||||
|
||||
bridge_async_config(#{port := Port} = Config) ->
|
||||
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
||||
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
||||
Host = maps:get(host, Config, "localhost"),
|
||||
Path = maps:get(path, Config, ""),
|
||||
PoolSize = maps:get(pool_size, Config, 1),
|
||||
QueryMode = maps:get(query_mode, Config, "async"),
|
||||
ConnectTimeout = maps:get(connect_timeout, Config, "1s"),
|
||||
RequestTimeout = maps:get(request_timeout, Config, "10s"),
|
||||
ResumeInterval = maps:get(resume_interval, Config, "1s"),
|
||||
HealthCheckInterval = maps:get(health_check_interval, Config, "200ms"),
|
||||
ResourceRequestTTL = maps:get(resource_request_ttl, Config, "infinity"),
|
||||
LocalTopic =
|
||||
case maps:find(local_topic, Config) of
|
||||
{ok, LT} ->
|
||||
lists:flatten(["local_topic = \"", LT, "\""]);
|
||||
error ->
|
||||
""
|
||||
end,
|
||||
ConfigString = io_lib:format(
|
||||
"bridges.~s.~s {\n"
|
||||
" url = \"http://~s:~p~s\"\n"
|
||||
" connect_timeout = \"~p\"\n"
|
||||
" enable = true\n"
|
||||
%% local_topic
|
||||
" ~s\n"
|
||||
" enable_pipelining = 100\n"
|
||||
" max_retries = 2\n"
|
||||
" method = \"post\"\n"
|
||||
" pool_size = ~p\n"
|
||||
" pool_type = \"random\"\n"
|
||||
" request_timeout = \"~s\"\n"
|
||||
" body = \"${id}\"\n"
|
||||
" resource_opts {\n"
|
||||
" inflight_window = 100\n"
|
||||
" health_check_interval = \"~s\"\n"
|
||||
" max_buffer_bytes = \"1GB\"\n"
|
||||
" query_mode = \"~s\"\n"
|
||||
" request_ttl = \"~p\"\n"
|
||||
" resume_interval = \"~s\"\n"
|
||||
" start_after_created = \"true\"\n"
|
||||
" start_timeout = \"5s\"\n"
|
||||
" worker_pool_size = \"1\"\n"
|
||||
" }\n"
|
||||
" ssl {\n"
|
||||
" enable = false\n"
|
||||
" }\n"
|
||||
"}\n",
|
||||
[
|
||||
Type,
|
||||
Name,
|
||||
Host,
|
||||
Port,
|
||||
Path,
|
||||
ConnectTimeout,
|
||||
LocalTopic,
|
||||
PoolSize,
|
||||
RequestTimeout,
|
||||
HealthCheckInterval,
|
||||
QueryMode,
|
||||
ResourceRequestTTL,
|
||||
ResumeInterval
|
||||
]
|
||||
),
|
||||
ct:pal(ConfigString),
|
||||
parse_and_check(ConfigString, Type, Name).
|
||||
|
||||
parse_and_check(ConfigString, BridgeType, Name) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
|
||||
RetConfig.
|
||||
|
||||
success_http_handler() ->
|
||||
success_http_handler(#{response_delay => 0}).
|
||||
|
||||
success_http_handler(Opts) ->
|
||||
ResponseDelay = maps:get(response_delay, Opts, 0),
|
||||
TestPid = self(),
|
||||
fun(Req0, State) ->
|
||||
{ok, Body, Req} = cowboy_req:read_body(Req0),
|
||||
Headers = cowboy_req:headers(Req),
|
||||
ct:pal("http request received: ~p", [
|
||||
#{body => Body, headers => Headers, response_delay => ResponseDelay}
|
||||
]),
|
||||
ResponseDelay > 0 andalso timer:sleep(ResponseDelay),
|
||||
TestPid ! {http, Headers, Body},
|
||||
Rep = cowboy_req:reply(
|
||||
200,
|
||||
#{<<"content-type">> => <<"text/plain">>},
|
||||
<<"hello">>,
|
||||
Req
|
||||
),
|
||||
{ok, Rep, State}
|
||||
end.
|
||||
|
||||
init_http_success_server(Config) ->
|
||||
HTTPPath = <<"/path">>,
|
||||
ServerSSLOpts = false,
|
||||
{ok, {HTTPPort, _Pid}} = emqx_bridge_http_connector_test_server:start_link(
|
||||
_Port = random, HTTPPath, ServerSSLOpts
|
||||
),
|
||||
ResponseDelayMS = 500,
|
||||
ok = emqx_bridge_http_connector_test_server:set_handler(
|
||||
success_http_handler(#{response_delay => ResponseDelayMS})
|
||||
),
|
||||
[
|
||||
{http_server, #{port => HTTPPort, path => HTTPPath}},
|
||||
{response_delay_ms, ResponseDelayMS},
|
||||
{bridge_name, ?BRIDGE_NAME}
|
||||
| Config
|
||||
].
|
|
@ -64,8 +64,10 @@
|
|||
|
||||
-define(COLLECT_REQ_LIMIT, 1000).
|
||||
-define(SEND_REQ(FROM, REQUEST), {'$send_req', FROM, REQUEST}).
|
||||
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT), {query, FROM, REQUEST, SENT, EXPIRE_AT}).
|
||||
-define(SIMPLE_QUERY(FROM, REQUEST), ?QUERY(FROM, REQUEST, false, infinity)).
|
||||
-define(QUERY(FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX),
|
||||
{query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX}
|
||||
).
|
||||
-define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)).
|
||||
-define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}).
|
||||
-define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef),
|
||||
{Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef}
|
||||
|
@ -77,7 +79,10 @@
|
|||
-type id() :: binary().
|
||||
-type index() :: pos_integer().
|
||||
-type expire_at() :: infinity | integer().
|
||||
-type queue_query() :: ?QUERY(reply_fun(), request(), HasBeenSent :: boolean(), expire_at()).
|
||||
-type trace_context() :: map() | undefined.
|
||||
-type queue_query() :: ?QUERY(
|
||||
reply_fun(), request(), HasBeenSent :: boolean(), expire_at(), TraceCtx :: trace_context()
|
||||
).
|
||||
-type request() :: term().
|
||||
-type request_from() :: undefined | gen_statem:from().
|
||||
-type timeout_ms() :: emqx_schema:timeout_duration_ms().
|
||||
|
@ -154,7 +159,10 @@ simple_sync_query(Id, Request, QueryOpts0) ->
|
|||
emqx_resource_metrics:matched_inc(Id),
|
||||
Ref = make_request_ref(),
|
||||
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
||||
Result = call_query(force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts),
|
||||
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
||||
Result = call_query(
|
||||
force_sync, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
||||
),
|
||||
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
||||
Result.
|
||||
|
||||
|
@ -167,8 +175,9 @@ simple_async_query(Id, Request, QueryOpts0) ->
|
|||
emqx_resource_metrics:matched_inc(Id),
|
||||
Ref = make_request_ref(),
|
||||
ReplyTo = maps:get(reply_to, QueryOpts0, undefined),
|
||||
TraceCtx = maps:get(trace_ctx, QueryOpts0, undefined),
|
||||
Result = call_query(
|
||||
async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request), QueryOpts
|
||||
async_if_possible, Id, Index, Ref, ?SIMPLE_QUERY(ReplyTo, Request, TraceCtx), QueryOpts
|
||||
),
|
||||
_ = handle_query_result(Id, Result, _HasBeenSent = false),
|
||||
Result.
|
||||
|
@ -439,10 +448,10 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) ->
|
|||
Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts),
|
||||
{ShouldAck, PostFn, DeltaCounters} =
|
||||
case QueryOrBatch of
|
||||
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) ->
|
||||
?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) ->
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
|
||||
reply_caller_defer_metrics(Id, Reply, QueryOpts);
|
||||
[?QUERY(_, _, _, _) | _] = Batch ->
|
||||
[?QUERY(_, _, _, _, _) | _] = Batch ->
|
||||
batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts)
|
||||
end,
|
||||
Data1 = aggregate_counters(Data0, DeltaCounters),
|
||||
|
@ -501,11 +510,13 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
|
|||
ReplyFun = maps:get(async_reply_fun, Opts, undefined),
|
||||
HasBeenSent = false,
|
||||
ExpireAt = maps:get(expire_at, Opts),
|
||||
?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt);
|
||||
TraceCtx = maps:get(trace_ctx, Opts, undefined),
|
||||
?QUERY(ReplyFun, Req, HasBeenSent, ExpireAt, TraceCtx);
|
||||
(?SEND_REQ(ReplyTo, {query, Req, Opts})) ->
|
||||
HasBeenSent = false,
|
||||
ExpireAt = maps:get(expire_at, Opts),
|
||||
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt)
|
||||
TraceCtx = maps:get(trace_ctx, Opts, undefined),
|
||||
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx)
|
||||
end,
|
||||
Requests
|
||||
),
|
||||
|
@ -515,7 +526,7 @@ collect_and_enqueue_query_requests(Request0, Data0) ->
|
|||
|
||||
reply_overflown([]) ->
|
||||
ok;
|
||||
reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt) | More]) ->
|
||||
reply_overflown([?QUERY(ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) | More]) ->
|
||||
do_reply_caller(ReplyTo, {error, buffer_overflow}),
|
||||
reply_overflown(More).
|
||||
|
||||
|
@ -630,7 +641,7 @@ do_flush(
|
|||
inflight_tid := InflightTID
|
||||
} = Data0,
|
||||
%% unwrap when not batching (i.e., batch size == 1)
|
||||
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt) = Request] = Batch,
|
||||
[?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch,
|
||||
QueryOpts = #{inflight_tid => InflightTID, simple_query => false},
|
||||
Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts),
|
||||
Reply = ?REPLY(ReplyTo, HasBeenSent, Result),
|
||||
|
@ -824,14 +835,14 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) ->
|
|||
|
||||
expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) ->
|
||||
lists:map(
|
||||
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT), Result}) ->
|
||||
fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) ->
|
||||
?REPLY(FROM, SENT, Result)
|
||||
end,
|
||||
lists:zip(Batch, BatchResults)
|
||||
);
|
||||
expand_batch_reply(BatchResult, Batch) ->
|
||||
lists:map(
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT)) ->
|
||||
fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) ->
|
||||
?REPLY(FROM, SENT, BatchResult)
|
||||
end,
|
||||
Batch
|
||||
|
@ -880,7 +891,7 @@ reply_dropped(_ReplyTo, _Result) ->
|
|||
-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
|
||||
batch_reply_dropped(Batch, Result) ->
|
||||
lists:foreach(
|
||||
fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
|
||||
fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt, _TraceCtx)) ->
|
||||
reply_dropped(ReplyTo, Result)
|
||||
end,
|
||||
Batch
|
||||
|
@ -1093,11 +1104,80 @@ call_query(QM, Id, Index, Ref, Query, QueryOpts) ->
|
|||
{ok, _Group, #{status := ?status_connecting, error := unhealthy_target}} ->
|
||||
{error, {unrecoverable_error, unhealthy_target}};
|
||||
{ok, _Group, Resource} ->
|
||||
do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource);
|
||||
set_rule_id_trace_meta_data(Query),
|
||||
QueryResult = do_call_query(QM, Id, Index, Ref, Query, QueryOpts, Resource),
|
||||
%% do_call_query does not throw an exception as the call to the
|
||||
%% resource is wrapped in a try catch expression so we will always
|
||||
%% unset the trace meta data
|
||||
unset_rule_id_trace_meta_data(),
|
||||
QueryResult;
|
||||
{error, not_found} ->
|
||||
?RESOURCE_ERROR(not_found, "resource not found")
|
||||
end.
|
||||
|
||||
set_rule_id_trace_meta_data(Requests) when is_list(Requests) ->
|
||||
%% Get the rule ids from requests
|
||||
RuleIDs = lists:foldl(fun collect_rule_id/2, #{}, Requests),
|
||||
ClientIDs = lists:foldl(fun collect_client_id/2, #{}, Requests),
|
||||
StopAfterRender = lists:foldl(fun collect_stop_after_render/2, no_info, Requests),
|
||||
StopAfterRenderVal =
|
||||
case StopAfterRender of
|
||||
only_true ->
|
||||
logger:update_process_metadata(#{stop_action_after_render => false}),
|
||||
true;
|
||||
only_false ->
|
||||
false;
|
||||
mixed ->
|
||||
?TRACE(
|
||||
warning,
|
||||
"ACTION",
|
||||
"mixed_stop_action_after_render_batch "
|
||||
"(A batch will be sent to connector where some but "
|
||||
"not all requests has stop_action_after_render set. "
|
||||
"The batch will get assigned "
|
||||
"stop_action_after_render = false)",
|
||||
#{rule_ids => RuleIDs, client_ids => ClientIDs}
|
||||
),
|
||||
false
|
||||
end,
|
||||
logger:update_process_metadata(#{
|
||||
rule_ids => RuleIDs, client_ids => ClientIDs, stop_action_after_render => StopAfterRenderVal
|
||||
}),
|
||||
ok;
|
||||
set_rule_id_trace_meta_data(Request) ->
|
||||
set_rule_id_trace_meta_data([Request]),
|
||||
ok.
|
||||
|
||||
collect_rule_id(?QUERY(_, _, _, _, #{rule_id := RuleId}), Acc) ->
|
||||
Acc#{RuleId => true};
|
||||
collect_rule_id(?QUERY(_, _, _, _, _), Acc) ->
|
||||
Acc.
|
||||
|
||||
collect_client_id(?QUERY(_, _, _, _, #{clientid := ClientId}), Acc) ->
|
||||
Acc#{ClientId => true};
|
||||
collect_client_id(?QUERY(_, _, _, _, _), Acc) ->
|
||||
Acc.
|
||||
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), no_info) ->
|
||||
only_true;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_true) ->
|
||||
only_true;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, #{stop_action_after_render := true}), only_false) ->
|
||||
mixed;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, _), no_info) ->
|
||||
only_false;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, _), only_true) ->
|
||||
mixed;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, _), only_false) ->
|
||||
only_false;
|
||||
collect_stop_after_render(?QUERY(_, _, _, _, _), mixed) ->
|
||||
mixed.
|
||||
|
||||
unset_rule_id_trace_meta_data() ->
|
||||
logger:update_process_metadata(#{
|
||||
rule_ids => #{}, client_ids => #{}, stop_action_after_render => false
|
||||
}).
|
||||
|
||||
%% action:kafka_producer:myproducer1:connector:kafka_producer:mykakfaclient1
|
||||
extract_connector_id(Id) when is_binary(Id) ->
|
||||
case binary:split(Id, <<":">>, [global]) of
|
||||
|
@ -1208,7 +1288,15 @@ do_call_query(_QM, _Id, _Index, _Ref, _Query, _QueryOpts, _Data) ->
|
|||
).
|
||||
|
||||
apply_query_fun(
|
||||
sync, Mod, Id, _Index, _Ref, ?QUERY(_, Request, _, _) = _Query, ResSt, Channels, QueryOpts
|
||||
sync,
|
||||
Mod,
|
||||
Id,
|
||||
_Index,
|
||||
_Ref,
|
||||
?QUERY(_, Request, _, _, _TraceCtx) = _Query,
|
||||
ResSt,
|
||||
Channels,
|
||||
QueryOpts
|
||||
) ->
|
||||
?tp(call_query, #{id => Id, mod => Mod, query => _Query, res_st => ResSt, call_mode => sync}),
|
||||
maybe_reply_to(
|
||||
|
@ -1227,7 +1315,15 @@ apply_query_fun(
|
|||
QueryOpts
|
||||
);
|
||||
apply_query_fun(
|
||||
async, Mod, Id, Index, Ref, ?QUERY(_, Request, _, _) = Query, ResSt, Channels, QueryOpts
|
||||
async,
|
||||
Mod,
|
||||
Id,
|
||||
Index,
|
||||
Ref,
|
||||
?QUERY(_, Request, _, _, _TraceCtx) = Query,
|
||||
ResSt,
|
||||
Channels,
|
||||
QueryOpts
|
||||
) ->
|
||||
?tp(call_query_async, #{
|
||||
id => Id, mod => Mod, query => Query, res_st => ResSt, call_mode => async
|
||||
|
@ -1268,7 +1364,7 @@ apply_query_fun(
|
|||
Id,
|
||||
_Index,
|
||||
_Ref,
|
||||
[?QUERY(_, FirstRequest, _, _) | _] = Batch,
|
||||
[?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
|
||||
ResSt,
|
||||
Channels,
|
||||
QueryOpts
|
||||
|
@ -1276,7 +1372,9 @@ apply_query_fun(
|
|||
?tp(call_batch_query, #{
|
||||
id => Id, mod => Mod, batch => Batch, res_st => ResSt, call_mode => sync
|
||||
}),
|
||||
Requests = lists:map(fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch),
|
||||
Requests = lists:map(
|
||||
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
|
||||
),
|
||||
maybe_reply_to(
|
||||
?APPLY_RESOURCE(
|
||||
call_batch_query,
|
||||
|
@ -1298,7 +1396,7 @@ apply_query_fun(
|
|||
Id,
|
||||
Index,
|
||||
Ref,
|
||||
[?QUERY(_, FirstRequest, _, _) | _] = Batch,
|
||||
[?QUERY(_, FirstRequest, _, _, _) | _] = Batch,
|
||||
ResSt,
|
||||
Channels,
|
||||
QueryOpts
|
||||
|
@ -1321,7 +1419,7 @@ apply_query_fun(
|
|||
min_batch => minimize(Batch)
|
||||
},
|
||||
Requests = lists:map(
|
||||
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt)) -> Request end, Batch
|
||||
fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch
|
||||
),
|
||||
IsRetriable = false,
|
||||
AsyncWorkerMRef = undefined,
|
||||
|
@ -1367,7 +1465,7 @@ handle_async_reply1(
|
|||
inflight_tid := InflightTID,
|
||||
resource_id := Id,
|
||||
buffer_worker := BufferWorkerPid,
|
||||
min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
|
||||
min_query := ?QUERY(ReplyTo, _, _, ExpireAt, _TraceCtx) = _Query
|
||||
} = ReplyContext,
|
||||
Result
|
||||
) ->
|
||||
|
@ -1399,7 +1497,7 @@ do_handle_async_reply(
|
|||
request_ref := Ref,
|
||||
buffer_worker := BufferWorkerPid,
|
||||
inflight_tid := InflightTID,
|
||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt) = _Query
|
||||
min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query
|
||||
},
|
||||
Result
|
||||
) ->
|
||||
|
@ -1486,13 +1584,13 @@ handle_async_batch_reply2([Inflight], ReplyContext, Results0, Now) ->
|
|||
%% So we just take the original flag from the ReplyContext batch
|
||||
%% and put it back to the batch found in inflight table
|
||||
%% which must have already been set to `false`
|
||||
[?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt) | _] = Batch,
|
||||
[?QUERY(_ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) | _] = Batch,
|
||||
{RealNotExpired0, RealExpired, Results} =
|
||||
sieve_expired_requests_with_results(RealBatch, Now, Results0),
|
||||
RealNotExpired =
|
||||
lists:map(
|
||||
fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt)) ->
|
||||
?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt)
|
||||
fun(?QUERY(ReplyTo, CoreReq, _HasBeenSent, ExpireAt, TraceCtx)) ->
|
||||
?QUERY(ReplyTo, CoreReq, HasBeenSent, ExpireAt, TraceCtx)
|
||||
end,
|
||||
RealNotExpired0
|
||||
),
|
||||
|
@ -1678,7 +1776,10 @@ inflight_get_first_retriable(InflightTID, Now) ->
|
|||
case ets:select(InflightTID, MatchSpec, _Limit = 1) of
|
||||
'$end_of_table' ->
|
||||
none;
|
||||
{[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)}], _Continuation} ->
|
||||
{
|
||||
[{Ref, Query = ?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)}],
|
||||
_Continuation
|
||||
} ->
|
||||
case is_expired(ExpireAt, Now) of
|
||||
true ->
|
||||
{expired, Ref, [Query]};
|
||||
|
@ -1714,7 +1815,7 @@ inflight_append(undefined, _InflightItem) ->
|
|||
ok;
|
||||
inflight_append(
|
||||
InflightTID,
|
||||
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
|
||||
?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _, _) | _] = Batch0, IsRetriable, AsyncWorkerMRef)
|
||||
) ->
|
||||
Batch = mark_as_sent(Batch0),
|
||||
InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef),
|
||||
|
@ -1726,7 +1827,10 @@ inflight_append(
|
|||
inflight_append(
|
||||
InflightTID,
|
||||
?INFLIGHT_ITEM(
|
||||
Ref, ?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt) = Query0, IsRetriable, AsyncWorkerMRef
|
||||
Ref,
|
||||
?QUERY(_ReplyTo, _Req, _HasBeenSent, _ExpireAt, _TraceCtx) = Query0,
|
||||
IsRetriable,
|
||||
AsyncWorkerMRef
|
||||
)
|
||||
) ->
|
||||
Query = mark_as_sent(Query0),
|
||||
|
@ -1790,9 +1894,13 @@ ack_inflight(undefined, _Ref, _BufferWorkerPid) ->
|
|||
ack_inflight(InflightTID, Ref, BufferWorkerPid) ->
|
||||
{Count, Removed} =
|
||||
case ets:take(InflightTID, Ref) of
|
||||
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
|
||||
[?INFLIGHT_ITEM(Ref, ?QUERY(_, _, _, _, _), _IsRetriable, _AsyncWorkerMRef)] ->
|
||||
{1, true};
|
||||
[?INFLIGHT_ITEM(Ref, [?QUERY(_, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef)] ->
|
||||
[
|
||||
?INFLIGHT_ITEM(
|
||||
Ref, [?QUERY(_, _, _, _, _) | _] = Batch, _IsRetriable, _AsyncWorkerMRef
|
||||
)
|
||||
] ->
|
||||
{length(Batch), true};
|
||||
[] ->
|
||||
{0, false}
|
||||
|
@ -1942,9 +2050,9 @@ do_collect_requests(Acc, Count, Limit) ->
|
|||
|
||||
mark_as_sent(Batch) when is_list(Batch) ->
|
||||
lists:map(fun mark_as_sent/1, Batch);
|
||||
mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt)) ->
|
||||
mark_as_sent(?QUERY(ReplyTo, Req, _HasBeenSent, ExpireAt, TraceCtx)) ->
|
||||
HasBeenSent = true,
|
||||
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt).
|
||||
?QUERY(ReplyTo, Req, HasBeenSent, ExpireAt, TraceCtx).
|
||||
|
||||
is_unrecoverable_error({error, {unrecoverable_error, _}}) ->
|
||||
true;
|
||||
|
@ -1967,7 +2075,7 @@ is_async_return(_) ->
|
|||
|
||||
sieve_expired_requests(Batch, Now) ->
|
||||
lists:partition(
|
||||
fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt)) ->
|
||||
fun(?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx)) ->
|
||||
not is_expired(ExpireAt, Now)
|
||||
end,
|
||||
Batch
|
||||
|
@ -1978,7 +2086,7 @@ sieve_expired_requests_with_results(Batch, Now, Results) when is_list(Results) -
|
|||
{RevNotExpiredBatch, RevNotExpiredResults, ExpiredBatch} =
|
||||
lists:foldl(
|
||||
fun(
|
||||
{?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt) = Query, Result},
|
||||
{?QUERY(_ReplyTo, _CoreReq, _HasBeenSent, ExpireAt, _TraceCtx) = Query, Result},
|
||||
{NotExpAcc, ResAcc, ExpAcc}
|
||||
) ->
|
||||
case not is_expired(ExpireAt, Now) of
|
||||
|
@ -2026,15 +2134,16 @@ ensure_expire_at(#{timeout := TimeoutMS} = Opts) ->
|
|||
Opts#{expire_at => ExpireAt}.
|
||||
|
||||
%% no need to keep the request for async reply handler
|
||||
minimize(?QUERY(_, _, _, _) = Q) ->
|
||||
minimize(?QUERY(_, _, _, _, _) = Q) ->
|
||||
do_minimize(Q);
|
||||
minimize(L) when is_list(L) ->
|
||||
lists:map(fun do_minimize/1, L).
|
||||
|
||||
-ifdef(TEST).
|
||||
do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
|
||||
do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt, _TraceCtx) = Query) -> Query.
|
||||
-else.
|
||||
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
|
||||
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt, TraceCtx)) ->
|
||||
?QUERY(ReplyTo, [], Sent, ExpireAt, TraceCtx).
|
||||
-endif.
|
||||
|
||||
%% To avoid message loss due to misconfigurations, we adjust
|
||||
|
|
|
@ -54,7 +54,8 @@ roots() ->
|
|||
{"rule_creation", sc(ref("rule_creation"), #{desc => ?DESC("root_rule_creation")})},
|
||||
{"rule_info", sc(ref("rule_info"), #{desc => ?DESC("root_rule_info")})},
|
||||
{"rule_events", sc(ref("rule_events"), #{desc => ?DESC("root_rule_events")})},
|
||||
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})}
|
||||
{"rule_test", sc(ref("rule_test"), #{desc => ?DESC("root_rule_test")})},
|
||||
{"rule_apply_test", sc(ref("rule_apply_test"), #{desc => ?DESC("root_apply_rule_test")})}
|
||||
].
|
||||
|
||||
fields("rule_engine") ->
|
||||
|
@ -124,6 +125,48 @@ fields("rule_test") ->
|
|||
)},
|
||||
{"sql", sc(binary(), #{desc => ?DESC("test_sql"), required => true})}
|
||||
];
|
||||
fields("rule_apply_test") ->
|
||||
[
|
||||
{"context",
|
||||
sc(
|
||||
hoconsc:union([
|
||||
ref("ctx_pub"),
|
||||
ref("ctx_sub"),
|
||||
ref("ctx_unsub"),
|
||||
ref("ctx_delivered"),
|
||||
ref("ctx_acked"),
|
||||
ref("ctx_dropped"),
|
||||
ref("ctx_connected"),
|
||||
ref("ctx_disconnected"),
|
||||
ref("ctx_connack"),
|
||||
ref("ctx_check_authz_complete"),
|
||||
ref("ctx_bridge_mqtt"),
|
||||
ref("ctx_delivery_dropped")
|
||||
]),
|
||||
#{
|
||||
desc => ?DESC("test_context"),
|
||||
default => #{}
|
||||
}
|
||||
)},
|
||||
{"environment",
|
||||
sc(
|
||||
typerefl:map(),
|
||||
#{
|
||||
desc =>
|
||||
?DESC("test_rule_environment"),
|
||||
default => #{}
|
||||
}
|
||||
)},
|
||||
{"stop_action_after_template_render",
|
||||
sc(
|
||||
typerefl:boolean(),
|
||||
#{
|
||||
desc =>
|
||||
?DESC("stop_action_after_template_render"),
|
||||
default => false
|
||||
}
|
||||
)}
|
||||
];
|
||||
fields("metrics") ->
|
||||
[
|
||||
{"matched",
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
'/rule_test'/2,
|
||||
'/rules'/2,
|
||||
'/rules/:id'/2,
|
||||
'/rules/:id/test'/2,
|
||||
'/rules/:id/metrics'/2,
|
||||
'/rules/:id/metrics/reset'/2
|
||||
]).
|
||||
|
@ -145,6 +146,7 @@ paths() ->
|
|||
"/rule_test",
|
||||
"/rules",
|
||||
"/rules/:id",
|
||||
"/rules/:id/test",
|
||||
"/rules/:id/metrics",
|
||||
"/rules/:id/metrics/reset"
|
||||
].
|
||||
|
@ -161,6 +163,9 @@ rule_creation_schema() ->
|
|||
rule_test_schema() ->
|
||||
ref(emqx_rule_api_schema, "rule_test").
|
||||
|
||||
rule_apply_test_schema() ->
|
||||
ref(emqx_rule_api_schema, "rule_apply_test").
|
||||
|
||||
rule_info_schema() ->
|
||||
ref(emqx_rule_api_schema, "rule_info").
|
||||
|
||||
|
@ -258,6 +263,21 @@ schema("/rules/:id") ->
|
|||
}
|
||||
}
|
||||
};
|
||||
schema("/rules/:id/test") ->
|
||||
#{
|
||||
'operationId' => '/rules/:id/test',
|
||||
post => #{
|
||||
tags => [<<"rules">>],
|
||||
description => ?DESC("api8"),
|
||||
summary => <<"Apply a rule with the given message and environment">>,
|
||||
'requestBody' => rule_apply_test_schema(),
|
||||
responses => #{
|
||||
400 => error_schema('BAD_REQUEST', "Invalid Parameters"),
|
||||
412 => error_schema('NOT_MATCH', "SQL Not Match"),
|
||||
200 => <<"Rule Applied">>
|
||||
}
|
||||
}
|
||||
};
|
||||
schema("/rules/:id/metrics") ->
|
||||
#{
|
||||
'operationId' => '/rules/:id/metrics',
|
||||
|
@ -392,6 +412,24 @@ param_path_id() ->
|
|||
end
|
||||
).
|
||||
|
||||
'/rules/:id/test'(post, #{body := Params, bindings := #{id := RuleId}}) ->
|
||||
?CHECK_PARAMS(
|
||||
Params,
|
||||
rule_apply_test,
|
||||
begin
|
||||
case emqx_rule_sqltester:apply_rule(RuleId, CheckedParams) of
|
||||
{ok, Result} ->
|
||||
{200, Result};
|
||||
{error, {parse_error, Reason}} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
|
||||
{error, nomatch} ->
|
||||
{412, #{code => 'NOT_MATCH', message => <<"SQL Not Match">>}};
|
||||
{error, Reason} ->
|
||||
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
|
||||
end
|
||||
end
|
||||
).
|
||||
|
||||
'/rules/:id'(get, #{bindings := #{id := Id}}) ->
|
||||
case emqx_rule_engine:get_rule(Id) of
|
||||
{ok, Rule} ->
|
||||
|
|
|
@ -69,9 +69,14 @@ apply_rule_discard_result(Rule, Columns, Envs) ->
|
|||
ok.
|
||||
|
||||
apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
|
||||
?TRACE("APPLY_RULE", "rule_activated", #{
|
||||
ruleid => RuleID, input => Columns, environment => Envs
|
||||
}),
|
||||
set_process_trace_metadata(RuleID, Columns),
|
||||
trace_rule_sql(
|
||||
"rule_activated",
|
||||
#{
|
||||
input => Columns, environment => Envs
|
||||
},
|
||||
debug
|
||||
),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'matched'),
|
||||
clear_rule_payload(),
|
||||
try
|
||||
|
@ -80,48 +85,80 @@ apply_rule(Rule = #{id := RuleID}, Columns, Envs) ->
|
|||
%% ignore the errors if select or match failed
|
||||
_:Reason = {select_and_transform_error, Error} ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
|
||||
?SLOG(warning, #{
|
||||
msg => "SELECT_clause_exception",
|
||||
rule_id => RuleID,
|
||||
reason => Error
|
||||
}),
|
||||
trace_rule_sql(
|
||||
"SELECT_clause_exception",
|
||||
#{
|
||||
reason => Error
|
||||
},
|
||||
warning
|
||||
),
|
||||
{error, Reason};
|
||||
_:Reason = {match_conditions_error, Error} ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
|
||||
?SLOG(warning, #{
|
||||
msg => "WHERE_clause_exception",
|
||||
rule_id => RuleID,
|
||||
reason => Error
|
||||
}),
|
||||
trace_rule_sql(
|
||||
"WHERE_clause_exception",
|
||||
#{
|
||||
reason => Error
|
||||
},
|
||||
warning
|
||||
),
|
||||
{error, Reason};
|
||||
_:Reason = {select_and_collect_error, Error} ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
|
||||
?SLOG(warning, #{
|
||||
msg => "FOREACH_clause_exception",
|
||||
rule_id => RuleID,
|
||||
reason => Error
|
||||
}),
|
||||
trace_rule_sql(
|
||||
"FOREACH_clause_exception",
|
||||
#{
|
||||
reason => Error
|
||||
},
|
||||
warning
|
||||
),
|
||||
{error, Reason};
|
||||
_:Reason = {match_incase_error, Error} ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
|
||||
?SLOG(warning, #{
|
||||
msg => "INCASE_clause_exception",
|
||||
rule_id => RuleID,
|
||||
reason => Error
|
||||
}),
|
||||
trace_rule_sql(
|
||||
"INCASE_clause_exception",
|
||||
#{
|
||||
reason => Error
|
||||
},
|
||||
warning
|
||||
),
|
||||
{error, Reason};
|
||||
Class:Error:StkTrace ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleID, 'failed.exception'),
|
||||
?SLOG(error, #{
|
||||
msg => "apply_rule_failed",
|
||||
rule_id => RuleID,
|
||||
exception => Class,
|
||||
reason => Error,
|
||||
stacktrace => StkTrace
|
||||
}),
|
||||
trace_rule_sql(
|
||||
"apply_rule_failed",
|
||||
#{
|
||||
exception => Class,
|
||||
reason => Error,
|
||||
stacktrace => StkTrace
|
||||
},
|
||||
warning
|
||||
),
|
||||
{error, {Error, StkTrace}}
|
||||
after
|
||||
reset_process_trace_metadata(Columns)
|
||||
end.
|
||||
|
||||
set_process_trace_metadata(RuleID, #{clientid := ClientID}) ->
|
||||
logger:update_process_metadata(#{
|
||||
rule_id => RuleID,
|
||||
clientid => ClientID
|
||||
});
|
||||
set_process_trace_metadata(RuleID, _) ->
|
||||
logger:update_process_metadata(#{
|
||||
rule_id => RuleID
|
||||
}).
|
||||
|
||||
reset_process_trace_metadata(#{clientid := _ClientID}) ->
|
||||
Meta = logger:get_process_metadata(),
|
||||
Meta1 = maps:remove(clientid, Meta),
|
||||
Meta2 = maps:remove(rule_id, Meta1),
|
||||
logger:set_process_metadata(Meta2);
|
||||
reset_process_trace_metadata(_) ->
|
||||
Meta = logger:get_process_metadata(),
|
||||
Meta1 = maps:remove(rule_id, Meta),
|
||||
logger:set_process_metadata(Meta1).
|
||||
|
||||
do_apply_rule(
|
||||
#{
|
||||
id := RuleId,
|
||||
|
@ -139,13 +176,18 @@ do_apply_rule(
|
|||
{ok, ColumnsAndSelected, FinalCollection} ->
|
||||
case FinalCollection of
|
||||
[] ->
|
||||
trace_rule_sql("FOREACH_yielded_no_result"),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result');
|
||||
_ ->
|
||||
trace_rule_sql(
|
||||
"FOREACH_yielded_result", #{result => FinalCollection}, debug
|
||||
),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed')
|
||||
end,
|
||||
NewEnvs = maps:merge(ColumnsAndSelected, Envs),
|
||||
{ok, [handle_action_list(RuleId, Actions, Coll, NewEnvs) || Coll <- FinalCollection]};
|
||||
false ->
|
||||
trace_rule_sql("FOREACH_yielded_no_result_no_match"),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||
{error, nomatch}
|
||||
end;
|
||||
|
@ -162,9 +204,11 @@ do_apply_rule(
|
|||
) ->
|
||||
case evaluate_select(Fields, Columns, Conditions) of
|
||||
{ok, Selected} ->
|
||||
trace_rule_sql("SELECT_yielded_result", #{result => Selected}, debug),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'passed'),
|
||||
{ok, handle_action_list(RuleId, Actions, Selected, maps:merge(Columns, Envs))};
|
||||
false ->
|
||||
trace_rule_sql("SELECT_yielded_no_result_no_match"),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'failed.no_result'),
|
||||
{error, nomatch}
|
||||
end.
|
||||
|
@ -348,37 +392,42 @@ handle_action_list(RuleId, Actions, Selected, Envs) ->
|
|||
|
||||
handle_action(RuleId, ActId, Selected, Envs) ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.total'),
|
||||
trace_action(ActId, "activating_action"),
|
||||
try
|
||||
do_handle_action(RuleId, ActId, Selected, Envs)
|
||||
Result = do_handle_action(RuleId, ActId, Selected, Envs),
|
||||
trace_action(ActId, "action_activated", #{result => Result}),
|
||||
Result
|
||||
catch
|
||||
throw:out_of_service ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
ok = emqx_metrics_worker:inc(
|
||||
rule_metrics, RuleId, 'actions.failed.out_of_service'
|
||||
),
|
||||
?SLOG(warning, #{msg => "out_of_service", action => ActId});
|
||||
trace_action(ActId, "out_of_service", #{}, warning);
|
||||
Err:Reason:ST ->
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
ok = emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'),
|
||||
?SLOG(error, #{
|
||||
msg => "action_failed",
|
||||
action => ActId,
|
||||
exception => Err,
|
||||
reason => Reason,
|
||||
stacktrace => ST
|
||||
})
|
||||
trace_action(
|
||||
ActId,
|
||||
"action_failed",
|
||||
#{
|
||||
exception => Err,
|
||||
reason => Reason,
|
||||
stacktrace => ST
|
||||
},
|
||||
error
|
||||
)
|
||||
end.
|
||||
|
||||
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found; R == unhealthy_target).
|
||||
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Envs) ->
|
||||
?TRACE(
|
||||
"BRIDGE",
|
||||
"bridge_action",
|
||||
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
||||
),
|
||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
|
||||
do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId} = Action, Selected, _Envs) ->
|
||||
trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
|
||||
TraceCtx = do_handle_action_get_trace_context(Action),
|
||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}},
|
||||
case
|
||||
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
||||
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{
|
||||
reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)
|
||||
})
|
||||
of
|
||||
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
||||
throw(out_of_service);
|
||||
|
@ -388,23 +437,20 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env
|
|||
Result
|
||||
end;
|
||||
do_handle_action(
|
||||
RuleId,
|
||||
{bridge_v2, BridgeType, BridgeName},
|
||||
_RuleId,
|
||||
{bridge_v2, BridgeType, BridgeName} = Action,
|
||||
Selected,
|
||||
_Envs
|
||||
) ->
|
||||
?TRACE(
|
||||
"BRIDGE",
|
||||
"bridge_action",
|
||||
#{bridge_id => {bridge_v2, BridgeType, BridgeName}}
|
||||
),
|
||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
|
||||
trace_action_bridge("BRIDGE", Action, "bridge_action", #{}, debug),
|
||||
TraceCtx = do_handle_action_get_trace_context(Action),
|
||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [TraceCtx], #{reply_dropped => true}},
|
||||
case
|
||||
emqx_bridge_v2:send_message(
|
||||
BridgeType,
|
||||
BridgeName,
|
||||
Selected,
|
||||
#{reply_to => ReplyTo}
|
||||
#{reply_to => ReplyTo, trace_ctx => maps:remove(action_id, TraceCtx)}
|
||||
)
|
||||
of
|
||||
{error, Reason} when Reason == bridge_not_found; Reason == bridge_stopped ->
|
||||
|
@ -414,13 +460,43 @@ do_handle_action(
|
|||
Result ->
|
||||
Result
|
||||
end;
|
||||
do_handle_action(RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
|
||||
do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs) ->
|
||||
trace_action(Action, "call_action_function"),
|
||||
%% the function can also throw 'out_of_service'
|
||||
Args = maps:get(args, Action, []),
|
||||
Result = Mod:Func(Selected, Envs, Args),
|
||||
inc_action_metrics(RuleId, Result),
|
||||
TraceCtx = do_handle_action_get_trace_context(Action),
|
||||
inc_action_metrics(TraceCtx, Result),
|
||||
trace_action(Action, "call_action_function_result", #{result => Result}, debug),
|
||||
Result.
|
||||
|
||||
do_handle_action_get_trace_context(Action) ->
|
||||
case logger:get_process_metadata() of
|
||||
#{
|
||||
rule_id := RuleID,
|
||||
clientid := ClientID
|
||||
} ->
|
||||
#{
|
||||
rule_id => RuleID,
|
||||
clientid => ClientID,
|
||||
action_id => Action
|
||||
};
|
||||
#{
|
||||
rule_id := RuleID
|
||||
} ->
|
||||
#{
|
||||
rule_id => RuleID,
|
||||
action_id => Action
|
||||
}
|
||||
end.
|
||||
|
||||
action_info({bridge, BridgeType, BridgeName, _ResId}) ->
|
||||
#{type => BridgeType, name => BridgeName};
|
||||
action_info({bridge_v2, BridgeType, BridgeName}) ->
|
||||
#{type => BridgeType, name => BridgeName};
|
||||
action_info(FuncInfoMap) ->
|
||||
FuncInfoMap.
|
||||
|
||||
eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) ->
|
||||
case Context of
|
||||
[Columns] ->
|
||||
|
@ -599,21 +675,31 @@ nested_put(Alias, Val, Columns0) ->
|
|||
Columns = ensure_decoded_payload(Alias, Columns0),
|
||||
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||
|
||||
inc_action_metrics(RuleId, Result) ->
|
||||
_ = do_inc_action_metrics(RuleId, Result),
|
||||
inc_action_metrics(TraceCtx, Result) ->
|
||||
_ = do_inc_action_metrics(TraceCtx, Result),
|
||||
Result.
|
||||
|
||||
do_inc_action_metrics(RuleId, {error, {recoverable_error, _}}) ->
|
||||
do_inc_action_metrics(
|
||||
#{rule_id := RuleId, action_id := ActId} = TraceContext,
|
||||
{error, {recoverable_error, _}}
|
||||
) ->
|
||||
trace_action(ActId, "out_of_service", TraceContext),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
|
||||
do_inc_action_metrics(RuleId, {error, {unrecoverable_error, _}}) ->
|
||||
do_inc_action_metrics(
|
||||
#{rule_id := RuleId, action_id := ActId} = TraceContext,
|
||||
{error, {unrecoverable_error, _} = Reason}
|
||||
) ->
|
||||
trace_action(ActId, "action_failed", maps:merge(#{reason => Reason}, TraceContext)),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
|
||||
do_inc_action_metrics(RuleId, R) ->
|
||||
do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R) ->
|
||||
case is_ok_result(R) of
|
||||
false ->
|
||||
trace_action(ActId, "action_failed", maps:merge(#{reason => R}, TraceContext)),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed'),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
|
||||
true ->
|
||||
trace_action(ActId, "action_success", maps:merge(#{result => R}, TraceContext)),
|
||||
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success')
|
||||
end.
|
||||
|
||||
|
@ -661,3 +747,39 @@ parse_function_name(Module, Name) when is_binary(Name) ->
|
|||
end;
|
||||
parse_function_name(_Module, Name) when is_atom(Name) ->
|
||||
Name.
|
||||
|
||||
trace_action(ActId, Message) ->
|
||||
trace_action_bridge("ACTION", ActId, Message).
|
||||
|
||||
trace_action(ActId, Message, Extra) ->
|
||||
trace_action_bridge("ACTION", ActId, Message, Extra, debug).
|
||||
|
||||
trace_action(ActId, Message, Extra, Level) ->
|
||||
trace_action_bridge("ACTION", ActId, Message, Extra, Level).
|
||||
|
||||
trace_action_bridge(Tag, ActId, Message) ->
|
||||
trace_action_bridge(Tag, ActId, Message, #{}, debug).
|
||||
|
||||
trace_action_bridge(Tag, ActId, Message, Extra, Level) ->
|
||||
?TRACE(
|
||||
Level,
|
||||
Tag,
|
||||
Message,
|
||||
maps:merge(
|
||||
#{
|
||||
action_info => action_info(ActId)
|
||||
},
|
||||
Extra
|
||||
)
|
||||
).
|
||||
|
||||
trace_rule_sql(Message) ->
|
||||
trace_rule_sql(Message, #{}, debug).
|
||||
|
||||
trace_rule_sql(Message, Extra, Level) ->
|
||||
?TRACE(
|
||||
Level,
|
||||
"RULE_SQL_EXEC",
|
||||
Message,
|
||||
Extra
|
||||
).
|
||||
|
|
|
@ -20,9 +20,77 @@
|
|||
test/1,
|
||||
get_selected_data/3,
|
||||
%% Some SQL functions return different results in the test environment
|
||||
is_test_runtime_env/0
|
||||
is_test_runtime_env/0,
|
||||
apply_rule/2
|
||||
]).
|
||||
|
||||
apply_rule(
|
||||
RuleId,
|
||||
#{
|
||||
context := Context,
|
||||
environment := Env,
|
||||
stop_action_after_template_render := StopAfterRender
|
||||
}
|
||||
) ->
|
||||
{ok, Rule} = emqx_rule_engine:get_rule(RuleId),
|
||||
InTopic = get_in_topic(Context),
|
||||
EventTopics = maps:get(from, Rule, []),
|
||||
case lists:all(fun is_publish_topic/1, EventTopics) of
|
||||
true ->
|
||||
%% test if the topic matches the topic filters in the rule
|
||||
case emqx_topic:match_any(InTopic, EventTopics) of
|
||||
true -> do_apply_matched_rule(Rule, Context, Env, StopAfterRender);
|
||||
false -> {error, nomatch}
|
||||
end;
|
||||
false ->
|
||||
case lists:member(InTopic, EventTopics) of
|
||||
true ->
|
||||
%% the rule is for both publish and events, test it directly
|
||||
do_apply_matched_rule(Rule, Context, Env, StopAfterRender);
|
||||
false ->
|
||||
{error, nomatch}
|
||||
end
|
||||
end.
|
||||
|
||||
do_apply_matched_rule(Rule, Context, Env, StopAfterRender) ->
|
||||
update_process_trace_metadata(StopAfterRender),
|
||||
Env1 =
|
||||
case Env of
|
||||
M when map_size(M) =:= 0 ->
|
||||
%% Use the default environment if no environment is provided
|
||||
default_apply_rule_environment();
|
||||
_ ->
|
||||
Env
|
||||
end,
|
||||
ApplyRuleRes = emqx_rule_runtime:apply_rule(Rule, Context, Env1),
|
||||
reset_trace_process_metadata(StopAfterRender),
|
||||
ApplyRuleRes.
|
||||
|
||||
update_process_trace_metadata(true = _StopAfterRender) ->
|
||||
logger:update_process_trace_metadata(#{
|
||||
stop_action_after_render => true
|
||||
});
|
||||
update_process_trace_metadata(false = _StopAfterRender) ->
|
||||
ok.
|
||||
|
||||
reset_trace_process_metadata(true = _StopAfterRender) ->
|
||||
Meta = logger:get_process_metadata(),
|
||||
NewMeta = maps:remove(stop_action_after_render, Meta),
|
||||
logger:set_process_metadata(NewMeta);
|
||||
reset_trace_process_metadata(false = _StopAfterRender) ->
|
||||
ok.
|
||||
|
||||
default_apply_rule_environment() ->
|
||||
#{
|
||||
headers => #{
|
||||
protocol => mqtt,
|
||||
username => undefined,
|
||||
peerhost => {127, 0, 0, 1},
|
||||
proto_ver => 5,
|
||||
properties => #{}
|
||||
}
|
||||
}.
|
||||
|
||||
-spec test(#{sql := binary(), context := map()}) -> {ok, map() | list()} | {error, term()}.
|
||||
test(#{sql := Sql, context := Context}) ->
|
||||
case emqx_rule_sqlparser:parse(Sql) of
|
||||
|
|
|
@ -0,0 +1,451 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_rule_engine_api_rule_apply_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-define(CONF_DEFAULT, <<"rule_engine {rules {}}">>).
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx_conf),
|
||||
% ok = emqx_common_test_helpers:load_config(emqx_rule_engine_schema, ?CONF_DEFAULT),
|
||||
% ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx, emqx_rule_engine, emqx_bridge, emqx_bridge_http]),
|
||||
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_connector,
|
||||
emqx_bridge_http,
|
||||
emqx_bridge,
|
||||
emqx_rule_engine
|
||||
],
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
init_per_testcase(_Case, Config) ->
|
||||
emqx_bridge_http_test_lib:init_http_success_server(Config).
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ok = emqx_bridge_http_connector_test_server:stop(),
|
||||
emqx_bridge_v2_testlib:delete_all_bridges(),
|
||||
emqx_bridge_v2_testlib:delete_all_connectors(),
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
ok.
|
||||
|
||||
t_basic_apply_rule_trace_ruleid(Config) ->
|
||||
basic_apply_rule_test_helper(Config, ruleid).
|
||||
|
||||
t_basic_apply_rule_trace_clientid(Config) ->
|
||||
basic_apply_rule_test_helper(Config, clientid).
|
||||
|
||||
basic_apply_rule_test_helper(Config, TraceType) ->
|
||||
HTTPServerConfig = ?config(http_server, Config),
|
||||
emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig),
|
||||
#{status := connected} = emqx_bridge_v2:health_check(
|
||||
http, emqx_bridge_http_test_lib:bridge_name()
|
||||
),
|
||||
%% Create Rule
|
||||
RuleTopic = iolist_to_binary([<<"my_rule_topic/">>, atom_to_binary(?FUNCTION_NAME)]),
|
||||
SQL = <<"SELECT payload.id as id FROM \"", RuleTopic/binary, "\"">>,
|
||||
{ok, #{<<"id">> := RuleId}} =
|
||||
emqx_bridge_testlib:create_rule_and_action_http(
|
||||
http,
|
||||
RuleTopic,
|
||||
Config,
|
||||
#{sql => SQL}
|
||||
),
|
||||
ClientId = <<"c_emqx">>,
|
||||
%% ===================================
|
||||
%% Create trace for RuleId
|
||||
%% ===================================
|
||||
Now = erlang:system_time(second) - 10,
|
||||
Start = Now,
|
||||
End = Now + 60,
|
||||
TraceName = atom_to_binary(?FUNCTION_NAME),
|
||||
TraceValue =
|
||||
case TraceType of
|
||||
ruleid ->
|
||||
RuleId;
|
||||
clientid ->
|
||||
ClientId
|
||||
end,
|
||||
Trace = #{
|
||||
name => TraceName,
|
||||
type => TraceType,
|
||||
TraceType => TraceValue,
|
||||
start_at => Start,
|
||||
end_at => End
|
||||
},
|
||||
emqx_trace_SUITE:reload(),
|
||||
ok = emqx_trace:clear(),
|
||||
{ok, _} = emqx_trace:create(Trace),
|
||||
%% ===================================
|
||||
Context = #{
|
||||
clientid => ClientId,
|
||||
event_type => message_publish,
|
||||
payload => <<"{\"msg\": \"hello\"}">>,
|
||||
qos => 1,
|
||||
topic => RuleTopic,
|
||||
username => <<"u_emqx">>
|
||||
},
|
||||
Params = #{
|
||||
% body => #{
|
||||
<<"context">> => Context
|
||||
% }
|
||||
},
|
||||
emqx_trace:check(),
|
||||
ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
|
||||
{ok, _} = file:read_file(emqx_trace:log_file(TraceName, Now)),
|
||||
?assertMatch({ok, _}, call_apply_rule_api(RuleId, Params)),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_NAttempts0 = 20,
|
||||
begin
|
||||
Bin = read_rule_trace_file(TraceName, TraceType, Now),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])),
|
||||
?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
|
||||
end
|
||||
),
|
||||
emqx_trace:delete(TraceName),
|
||||
ok.
|
||||
|
||||
%% Helper Functions
|
||||
|
||||
% t_ctx_pub(_) ->
|
||||
% SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>,
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => message_publish,
|
||||
% payload => <<"{\"msg\": \"hello\"}">>,
|
||||
% qos => 1,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% Expected = Context#{msg => <<"hello">>},
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_sub(_) ->
|
||||
% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>,
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => session_subscribed,
|
||||
% qos => 1,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
|
||||
% do_test(SQL, Context, Context).
|
||||
|
||||
% t_ctx_unsub(_) ->
|
||||
% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>,
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => session_unsubscribed,
|
||||
% qos => 1,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% do_test(SQL, Context, Context).
|
||||
|
||||
% t_ctx_delivered(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>,
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx_2">>,
|
||||
% event_type => message_delivered,
|
||||
% from_clientid => <<"c_emqx_1">>,
|
||||
% from_username => <<"u_emqx_1">>,
|
||||
% payload => <<"{\"msg\": \"hello\"}">>,
|
||||
% qos => 1,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx_2">>
|
||||
% },
|
||||
% Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_acked(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>,
|
||||
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx_2">>,
|
||||
% event_type => message_acked,
|
||||
% from_clientid => <<"c_emqx_1">>,
|
||||
% from_username => <<"u_emqx_1">>,
|
||||
% payload => <<"{\"msg\": \"hello\"}">>,
|
||||
% qos => 1,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx_2">>
|
||||
% },
|
||||
|
||||
% Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context),
|
||||
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_droped(_) ->
|
||||
% SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>,
|
||||
% Topic = <<"t/a">>,
|
||||
% QoS = 1,
|
||||
% Reason = <<"no_subscribers">>,
|
||||
% Context = #{
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => message_dropped,
|
||||
% payload => <<"{\"msg\": \"hello\"}">>,
|
||||
% qos => QoS,
|
||||
% reason => Reason,
|
||||
% topic => Topic,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
|
||||
% Expected = with_node_timestampe([reason, topic, qos], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_connected(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>,
|
||||
|
||||
% Context =
|
||||
% #{
|
||||
% clean_start => true,
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => client_connected,
|
||||
% is_bridge => false,
|
||||
% peername => <<"127.0.0.1:52918">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% Expected = check_result([clientid, username, keepalive, is_bridge], [], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_disconnected(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>,
|
||||
|
||||
% Context =
|
||||
% #{
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => client_disconnected,
|
||||
% reason => <<"normal">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% Expected = check_result([clientid, username, reason], [disconnected_at, node], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_connack(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>,
|
||||
|
||||
% Context =
|
||||
% #{
|
||||
% clean_start => true,
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => client_connack,
|
||||
% reason_code => <<"sucess">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% Expected = check_result([clientid, username, reason_code], [node], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_check_authz_complete(_) ->
|
||||
% SQL =
|
||||
% <<
|
||||
% "SELECT clientid, username, topic, action, result,\n"
|
||||
% "authz_source, node FROM \"$events/client_check_authz_complete\""
|
||||
% >>,
|
||||
|
||||
% Context =
|
||||
% #{
|
||||
% action => <<"publish">>,
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => client_check_authz_complete,
|
||||
% result => <<"allow">>,
|
||||
% topic => <<"t/1">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% Expected = check_result(
|
||||
% [clientid, username, topic, action],
|
||||
% [authz_source, node, result],
|
||||
% Context
|
||||
% ),
|
||||
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_ctx_delivery_dropped(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>,
|
||||
|
||||
% Context =
|
||||
% #{
|
||||
% clientid => <<"c_emqx_2">>,
|
||||
% event_type => delivery_dropped,
|
||||
% from_clientid => <<"c_emqx_1">>,
|
||||
% from_username => <<"u_emqx_1">>,
|
||||
% payload => <<"{\"msg\": \"hello\"}">>,
|
||||
% qos => 1,
|
||||
% reason => <<"queue_full">>,
|
||||
% topic => <<"t/a">>,
|
||||
% username => <<"u_emqx_2">>
|
||||
% },
|
||||
% Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context),
|
||||
% do_test(SQL, Context, Expected).
|
||||
|
||||
% t_mongo_date_function_should_return_string_in_test_env(_) ->
|
||||
% SQL =
|
||||
% <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,
|
||||
% Context =
|
||||
% #{
|
||||
% action => <<"publish">>,
|
||||
% clientid => <<"c_emqx">>,
|
||||
% event_type => client_check_authz_complete,
|
||||
% result => <<"allow">>,
|
||||
% topic => <<"t/1">>,
|
||||
% username => <<"u_emqx">>
|
||||
% },
|
||||
% CheckFunction = fun(Result) ->
|
||||
% MongoDate = maps:get(mongo_date, Result),
|
||||
% %% Use regex to match the expected string
|
||||
% MatchResult = re:run(MongoDate, <<"ISODate\\([0-9]{4}-[0-9]{2}-[0-9]{2}T.*\\)">>),
|
||||
% ?assertMatch({match, _}, MatchResult),
|
||||
% ok
|
||||
% end,
|
||||
% do_test(SQL, Context, CheckFunction).
|
||||
|
||||
% do_test(SQL, Context, Expected0) ->
|
||||
% Res = emqx_rule_engine_api:'/rule_test'(
|
||||
% post,
|
||||
% test_rule_params(SQL, Context)
|
||||
% ),
|
||||
% ?assertMatch({200, _}, Res),
|
||||
% {200, Result0} = Res,
|
||||
% Result = emqx_utils_maps:unsafe_atom_key_map(Result0),
|
||||
% case is_function(Expected0) of
|
||||
% false ->
|
||||
% Expected = maps:without([event_type], Expected0),
|
||||
% ?assertMatch(Expected, Result, Expected);
|
||||
% _ ->
|
||||
% Expected0(Result)
|
||||
% end,
|
||||
% ok.
|
||||
|
||||
% test_rule_params(Sql, Context) ->
|
||||
% #{
|
||||
% body => #{
|
||||
% <<"context">> => Context,
|
||||
% <<"sql">> => Sql
|
||||
% }
|
||||
% }.
|
||||
|
||||
% with_node_timestampe(Keys, Context) ->
|
||||
% check_result(Keys, [node, timestamp], Context).
|
||||
|
||||
% check_result(Keys, Exists, Context) ->
|
||||
% Log = fun(Format, Args) ->
|
||||
% lists:flatten(io_lib:format(Format, Args))
|
||||
% end,
|
||||
|
||||
% Base = maps:with(Keys, Context),
|
||||
|
||||
% fun(Result) ->
|
||||
% maps:foreach(
|
||||
% fun(Key, Value) ->
|
||||
% ?assertEqual(
|
||||
% Value,
|
||||
% maps:get(Key, Result, undefined),
|
||||
% Log("Key:~p value error~nResult:~p~n", [Key, Result])
|
||||
% )
|
||||
% end,
|
||||
% Base
|
||||
% ),
|
||||
|
||||
% NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end,
|
||||
% lists:foreach(
|
||||
% fun(Key) ->
|
||||
% Find = maps:find(Key, Result),
|
||||
% Formatter = NotExists(Key),
|
||||
% ?assertMatch({ok, _}, Find, Formatter),
|
||||
% ?assertNotMatch({ok, undefined}, Find, Formatter),
|
||||
% ?assertNotMatch({ok, <<"undefined">>}, Find, Formatter)
|
||||
% end,
|
||||
% Exists
|
||||
% ),
|
||||
|
||||
% ?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result)
|
||||
% end.
|
||||
|
||||
call_apply_rule_api(RuleId, Params) ->
|
||||
Method = post,
|
||||
Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]),
|
||||
ct:pal("sql test (http):\n ~p", [Params]),
|
||||
Res = request(Method, Path, Params),
|
||||
ct:pal("sql test (http) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
request(Method, Path, Params) ->
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
|
||||
{ok, {Status, Headers, Body0}} ->
|
||||
Body = maybe_json_decode(Body0),
|
||||
{ok, {Status, Headers, Body}};
|
||||
{error, {Status, Headers, Body0}} ->
|
||||
Body =
|
||||
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
|
||||
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
|
||||
Msg = maybe_json_decode(Msg0),
|
||||
Decoded0#{<<"message">> := Msg};
|
||||
{ok, Decoded0} ->
|
||||
Decoded0;
|
||||
{error, _} ->
|
||||
Body0
|
||||
end,
|
||||
{error, {Status, Headers, Body}};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
maybe_json_decode(X) ->
|
||||
case emqx_utils_json:safe_decode(X, [return_maps]) of
|
||||
{ok, Decoded} -> Decoded;
|
||||
{error, _} -> X
|
||||
end.
|
||||
|
||||
read_rule_trace_file(TraceName, TraceType, From) ->
|
||||
emqx_trace:check(),
|
||||
ok = emqx_trace_handler_SUITE:filesync(TraceName, TraceType),
|
||||
{ok, Bin} = file:read_file(emqx_trace:log_file(TraceName, From)),
|
||||
io_lib:format("MYTRACE:~n~s", [Bin]),
|
||||
Bin.
|
Loading…
Reference in New Issue