feat(apply rule test): make option to stop action after render work

This commit makes the apply rule HTTP API option to stop an action work
for the HTTP action, and adds infrastructure that makes it easy to add
this functionality to other actions.
This commit is contained in:
Kjell Winblad 2024-04-06 17:21:12 +02:00
parent ef705c2285
commit 5479932190
5 changed files with 109 additions and 281 deletions

View File

@ -28,7 +28,8 @@
subscribe/3,
unsubscribe/2,
log/3,
log/4
log/4,
rendered_action_template/2
]).
-export([
@ -86,6 +87,25 @@ unsubscribe(<<"$SYS/", _/binary>>, _SubOpts) ->
unsubscribe(Topic, SubOpts) ->
?TRACE("UNSUBSCRIBE", "unsubscribe", #{topic => Topic, sub_opts => SubOpts}).
rendered_action_template(ActionID, RenderResult) ->
Msg = io_lib:format("action_template_rendered(~s)", [ActionID]),
TraceResult = ?TRACE("QUERY_RENDER", Msg, RenderResult),
case logger:get_process_metadata() of
#{stop_action_after_render := true} ->
%% We throw an unrecoverable error to stop action before the
%% resource is called/modified
StopMsg = io_lib:format(
"action_stopped_after_render(~s): "
"Action stopped after template render due to test setting.",
[ActionID]
),
MsgBin = iolist_to_binary(StopMsg),
error({unrecoverable_error, MsgBin});
_ ->
ok
end,
TraceResult.
log(List, Msg, Meta) ->
log(debug, List, Msg, Meta).

View File

@ -371,6 +371,29 @@ on_query(
}
),
NRequest = formalize_request(Method, BasePath, Request),
case NRequest of
{Path, Headers} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout
}
);
{Path, Headers, Body} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout,
body => Body
}
)
end,
Worker = resolve_pool_worker(State, KeyOrNum),
Result0 = ehttpc:request(
Worker,
@ -480,6 +503,29 @@ on_query_async(
}
),
NRequest = formalize_request(Method, BasePath, Request),
case NRequest of
{Path, Headers} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout
}
);
{Path, Headers, Body} ->
emqx_trace:rendered_action_template(
InstId,
#{
path => Path,
method => Method,
headers => Headers,
timeout => Timeout,
body => Body
}
)
end,
MaxAttempts = maps:get(max_attempts, State, 3),
Context = #{
attempt => 1,
@ -661,22 +707,13 @@ process_request_and_action(Request, ActionState, Msg) ->
),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg),
RenderResult = #{
#{
method => Method,
path => Path,
body => Body,
headers => Headers,
request_timeout => maps:get(request_timeout, ActionState)
},
?TRACE(
"QUERY_RENDER",
"http_connector_successfully_rendered_request",
#{
request => Request,
render_result => RenderResult
}
),
RenderResult.
}.
merge_proplist(Proplist1, Proplist2) ->
lists:foldl(

View File

@ -471,7 +471,9 @@ do_handle_action(_RuleId, #{mod := Mod, func := Func} = Action, Selected, Envs)
Result.
do_handle_action_get_trace_context(Action) ->
case logger:get_process_metadata() of
Metadata = logger:get_process_metadata(),
StopAfterRender = maps:get(stop_action_after_render, Metadata, false),
case Metadata of
#{
rule_id := RuleID,
clientid := ClientID
@ -479,14 +481,16 @@ do_handle_action_get_trace_context(Action) ->
#{
rule_id => RuleID,
clientid => ClientID,
action_id => Action
action_id => Action,
stop_action_after_render => StopAfterRender
};
#{
rule_id := RuleID
} ->
#{
rule_id => RuleID,
action_id => Action
action_id => Action,
stop_action_after_render => StopAfterRender
}
end.

View File

@ -67,7 +67,7 @@ do_apply_matched_rule(Rule, Context, Env, StopAfterRender) ->
ApplyRuleRes.
update_process_trace_metadata(true = _StopAfterRender) ->
logger:update_process_trace_metadata(#{
logger:update_process_metadata(#{
stop_action_after_render => true
});
update_process_trace_metadata(false = _StopAfterRender) ->

View File

@ -64,12 +64,15 @@ end_per_testcase(_TestCase, _Config) ->
ok.
t_basic_apply_rule_trace_ruleid(Config) ->
basic_apply_rule_test_helper(Config, ruleid).
basic_apply_rule_test_helper(Config, ruleid, false).
t_basic_apply_rule_trace_clientid(Config) ->
basic_apply_rule_test_helper(Config, clientid).
basic_apply_rule_test_helper(Config, clientid, false).
basic_apply_rule_test_helper(Config, TraceType) ->
t_basic_apply_rule_trace_ruleid_stop_after_render(Config) ->
basic_apply_rule_test_helper(Config, ruleid, true).
basic_apply_rule_test_helper(Config, TraceType, StopAfterRender) ->
HTTPServerConfig = ?config(http_server, Config),
emqx_bridge_http_test_lib:make_bridge(HTTPServerConfig),
#{status := connected} = emqx_bridge_v2:health_check(
@ -121,7 +124,8 @@ basic_apply_rule_test_helper(Config, TraceType) ->
},
Params = #{
% body => #{
<<"context">> => Context
<<"context">> => Context,
<<"stop_action_after_template_render">> => StopAfterRender
% }
},
emqx_trace:check(),
@ -133,279 +137,42 @@ basic_apply_rule_test_helper(Config, TraceType) ->
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG:~n~s", [Bin]),
?assertNotEqual(nomatch, binary:match(Bin, [<<"rule_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"SELECT_yielded_result">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"bridge_action">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_activated">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"successfully_rendered_request">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_template_rendered">>])),
?assertNotEqual(nomatch, binary:match(Bin, [<<"QUERY_ASYNC">>]))
end
),
case StopAfterRender of
true ->
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG2:~n~s", [Bin]),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_failed">>]))
end
);
false ->
?retry(
_Interval0 = 200,
_NAttempts0 = 20,
begin
Bin = read_rule_trace_file(TraceName, TraceType, Now),
io:format("THELOG3:~n~s", [Bin]),
?assertNotEqual(nomatch, binary:match(Bin, [<<"action_success">>]))
end
)
end,
emqx_trace:delete(TraceName),
ok.
%% Helper Functions
% t_ctx_pub(_) ->
% SQL = <<"SELECT payload.msg as msg, clientid, username, payload, topic, qos FROM \"t/#\"">>,
% Context = #{
% clientid => <<"c_emqx">>,
% event_type => message_publish,
% payload => <<"{\"msg\": \"hello\"}">>,
% qos => 1,
% topic => <<"t/a">>,
% username => <<"u_emqx">>
% },
% Expected = Context#{msg => <<"hello">>},
% do_test(SQL, Context, Expected).
% t_ctx_sub(_) ->
% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_subscribed\"">>,
% Context = #{
% clientid => <<"c_emqx">>,
% event_type => session_subscribed,
% qos => 1,
% topic => <<"t/a">>,
% username => <<"u_emqx">>
% },
% do_test(SQL, Context, Context).
% t_ctx_unsub(_) ->
% SQL = <<"SELECT clientid, username, topic, qos FROM \"$events/session_unsubscribed\"">>,
% Context = #{
% clientid => <<"c_emqx">>,
% event_type => session_unsubscribed,
% qos => 1,
% topic => <<"t/a">>,
% username => <<"u_emqx">>
% },
% do_test(SQL, Context, Context).
% t_ctx_delivered(_) ->
% SQL =
% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_delivered\"">>,
% Context = #{
% clientid => <<"c_emqx_2">>,
% event_type => message_delivered,
% from_clientid => <<"c_emqx_1">>,
% from_username => <<"u_emqx_1">>,
% payload => <<"{\"msg\": \"hello\"}">>,
% qos => 1,
% topic => <<"t/a">>,
% username => <<"u_emqx_2">>
% },
% Expected = check_result([from_clientid, from_username, topic, qos], [node, timestamp], Context),
% do_test(SQL, Context, Expected).
% t_ctx_acked(_) ->
% SQL =
% <<"SELECT from_clientid, from_username, topic, qos, node, timestamp FROM \"$events/message_acked\"">>,
% Context = #{
% clientid => <<"c_emqx_2">>,
% event_type => message_acked,
% from_clientid => <<"c_emqx_1">>,
% from_username => <<"u_emqx_1">>,
% payload => <<"{\"msg\": \"hello\"}">>,
% qos => 1,
% topic => <<"t/a">>,
% username => <<"u_emqx_2">>
% },
% Expected = with_node_timestampe([from_clientid, from_username, topic, qos], Context),
% do_test(SQL, Context, Expected).
% t_ctx_droped(_) ->
% SQL = <<"SELECT reason, topic, qos, node, timestamp FROM \"$events/message_dropped\"">>,
% Topic = <<"t/a">>,
% QoS = 1,
% Reason = <<"no_subscribers">>,
% Context = #{
% clientid => <<"c_emqx">>,
% event_type => message_dropped,
% payload => <<"{\"msg\": \"hello\"}">>,
% qos => QoS,
% reason => Reason,
% topic => Topic,
% username => <<"u_emqx">>
% },
% Expected = with_node_timestampe([reason, topic, qos], Context),
% do_test(SQL, Context, Expected).
% t_ctx_connected(_) ->
% SQL =
% <<"SELECT clientid, username, keepalive, is_bridge FROM \"$events/client_connected\"">>,
% Context =
% #{
% clean_start => true,
% clientid => <<"c_emqx">>,
% event_type => client_connected,
% is_bridge => false,
% peername => <<"127.0.0.1:52918">>,
% username => <<"u_emqx">>
% },
% Expected = check_result([clientid, username, keepalive, is_bridge], [], Context),
% do_test(SQL, Context, Expected).
% t_ctx_disconnected(_) ->
% SQL =
% <<"SELECT clientid, username, reason, disconnected_at, node FROM \"$events/client_disconnected\"">>,
% Context =
% #{
% clientid => <<"c_emqx">>,
% event_type => client_disconnected,
% reason => <<"normal">>,
% username => <<"u_emqx">>
% },
% Expected = check_result([clientid, username, reason], [disconnected_at, node], Context),
% do_test(SQL, Context, Expected).
% t_ctx_connack(_) ->
% SQL =
% <<"SELECT clientid, username, reason_code, node FROM \"$events/client_connack\"">>,
% Context =
% #{
% clean_start => true,
% clientid => <<"c_emqx">>,
% event_type => client_connack,
% reason_code => <<"sucess">>,
% username => <<"u_emqx">>
% },
% Expected = check_result([clientid, username, reason_code], [node], Context),
% do_test(SQL, Context, Expected).
% t_ctx_check_authz_complete(_) ->
% SQL =
% <<
% "SELECT clientid, username, topic, action, result,\n"
% "authz_source, node FROM \"$events/client_check_authz_complete\""
% >>,
% Context =
% #{
% action => <<"publish">>,
% clientid => <<"c_emqx">>,
% event_type => client_check_authz_complete,
% result => <<"allow">>,
% topic => <<"t/1">>,
% username => <<"u_emqx">>
% },
% Expected = check_result(
% [clientid, username, topic, action],
% [authz_source, node, result],
% Context
% ),
% do_test(SQL, Context, Expected).
% t_ctx_delivery_dropped(_) ->
% SQL =
% <<"SELECT from_clientid, from_username, reason, topic, qos FROM \"$events/delivery_dropped\"">>,
% Context =
% #{
% clientid => <<"c_emqx_2">>,
% event_type => delivery_dropped,
% from_clientid => <<"c_emqx_1">>,
% from_username => <<"u_emqx_1">>,
% payload => <<"{\"msg\": \"hello\"}">>,
% qos => 1,
% reason => <<"queue_full">>,
% topic => <<"t/a">>,
% username => <<"u_emqx_2">>
% },
% Expected = check_result([from_clientid, from_username, reason, qos, topic], [], Context),
% do_test(SQL, Context, Expected).
% t_mongo_date_function_should_return_string_in_test_env(_) ->
% SQL =
% <<"SELECT mongo_date() as mongo_date FROM \"$events/client_check_authz_complete\"">>,
% Context =
% #{
% action => <<"publish">>,
% clientid => <<"c_emqx">>,
% event_type => client_check_authz_complete,
% result => <<"allow">>,
% topic => <<"t/1">>,
% username => <<"u_emqx">>
% },
% CheckFunction = fun(Result) ->
% MongoDate = maps:get(mongo_date, Result),
% %% Use regex to match the expected string
% MatchResult = re:run(MongoDate, <<"ISODate\\([0-9]{4}-[0-9]{2}-[0-9]{2}T.*\\)">>),
% ?assertMatch({match, _}, MatchResult),
% ok
% end,
% do_test(SQL, Context, CheckFunction).
% do_test(SQL, Context, Expected0) ->
% Res = emqx_rule_engine_api:'/rule_test'(
% post,
% test_rule_params(SQL, Context)
% ),
% ?assertMatch({200, _}, Res),
% {200, Result0} = Res,
% Result = emqx_utils_maps:unsafe_atom_key_map(Result0),
% case is_function(Expected0) of
% false ->
% Expected = maps:without([event_type], Expected0),
% ?assertMatch(Expected, Result, Expected);
% _ ->
% Expected0(Result)
% end,
% ok.
% test_rule_params(Sql, Context) ->
% #{
% body => #{
% <<"context">> => Context,
% <<"sql">> => Sql
% }
% }.
% with_node_timestampe(Keys, Context) ->
% check_result(Keys, [node, timestamp], Context).
% check_result(Keys, Exists, Context) ->
% Log = fun(Format, Args) ->
% lists:flatten(io_lib:format(Format, Args))
% end,
% Base = maps:with(Keys, Context),
% fun(Result) ->
% maps:foreach(
% fun(Key, Value) ->
% ?assertEqual(
% Value,
% maps:get(Key, Result, undefined),
% Log("Key:~p value error~nResult:~p~n", [Key, Result])
% )
% end,
% Base
% ),
% NotExists = fun(Key) -> Log("Key:~p not exists in result:~p~n", [Key, Result]) end,
% lists:foreach(
% fun(Key) ->
% Find = maps:find(Key, Result),
% Formatter = NotExists(Key),
% ?assertMatch({ok, _}, Find, Formatter),
% ?assertNotMatch({ok, undefined}, Find, Formatter),
% ?assertNotMatch({ok, <<"undefined">>}, Find, Formatter)
% end,
% Exists
% ),
% ?assertEqual(erlang:length(Keys) + erlang:length(Exists), maps:size(Result), Result)
% end.
call_apply_rule_api(RuleId, Params) ->
Method = post,
Path = emqx_mgmt_api_test_util:api_path(["rules", RuleId, "test"]),