fix(rule tracing): clean up error tuple in the action_failed trace

This commit is contained in:
Kjell Winblad 2024-05-07 09:13:55 +02:00
parent ca88f5731b
commit feecc36607
2 changed files with 168 additions and 22 deletions

View File

@ -757,14 +757,18 @@ do_inc_action_metrics(
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown');
do_inc_action_metrics( do_inc_action_metrics(
#{rule_id := RuleId, action_id := ActId} = TraceContext, #{rule_id := RuleId, action_id := ActId} = TraceContext,
{error, {recoverable_error, _}} {error, {recoverable_error, _}} = Reason
) -> ) ->
FormatterRes = #emqx_trace_format_func_data{
function = fun trace_formatted_result/1,
data = {ActId, Reason}
},
TraceContext1 = maps:remove(action_id, TraceContext), TraceContext1 = maps:remove(action_id, TraceContext),
trace_action(ActId, "out_of_service", TraceContext1), trace_action(ActId, "out_of_service", TraceContext1#{reason => FormatterRes}),
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
do_inc_action_metrics( do_inc_action_metrics(
#{rule_id := RuleId, action_id := ActId} = TraceContext, #{rule_id := RuleId, action_id := ActId} = TraceContext,
{error, {unrecoverable_error, _} = Reason} {error, {unrecoverable_error, _}} = Reason
) -> ) ->
TraceContext1 = maps:remove(action_id, TraceContext), TraceContext1 = maps:remove(action_id, TraceContext),
FormatterRes = #emqx_trace_format_func_data{ FormatterRes = #emqx_trace_format_func_data{
@ -801,12 +805,12 @@ do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R
trace_formatted_result({{bridge_v2, Type, _Name}, R}) -> trace_formatted_result({{bridge_v2, Type, _Name}, R}) ->
ConnectorType = emqx_action_info:action_type_to_connector_type(Type), ConnectorType = emqx_action_info:action_type_to_connector_type(Type),
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
emqx_resource:call_format_query_result(ResourceModule, R); clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R));
trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) -> trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) ->
BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType),
ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type), ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type),
ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType),
emqx_resource:call_format_query_result(ResourceModule, R); clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R));
trace_formatted_result({_, R}) -> trace_formatted_result({_, R}) ->
R. R.
@ -819,6 +823,15 @@ is_ok_result(R) when is_tuple(R) ->
is_ok_result(_) -> is_ok_result(_) ->
false. false.
clean_up_error_tuple({error, {unrecoverable_error, Reason}}) ->
Reason;
clean_up_error_tuple({error, {recoverable_error, Reason}}) ->
Reason;
clean_up_error_tuple({error, Reason}) ->
Reason;
clean_up_error_tuple(Result) ->
Result.
parse_module_name(Name) when is_binary(Name) -> parse_module_name(Name) when is_binary(Name) ->
case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of
true -> true ->

View File

@ -292,23 +292,7 @@ create_trace(TraceName, TraceType, TraceValue) ->
{ok, _} = emqx_trace:create(Trace). {ok, _} = emqx_trace:create(Trace).
t_apply_rule_test_batch_separation_stop_after_render(_Config) -> t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
MeckOpts = [passthrough, no_link, no_history, non_strict], meck_in_test_connector(),
catch meck:new(emqx_connector_info, MeckOpts),
meck:expect(
emqx_connector_info,
hard_coded_test_connector_info_modules,
0,
[emqx_rule_engine_test_connector_info]
),
emqx_connector_info:clean_cache(),
catch meck:new(emqx_action_info, MeckOpts),
meck:expect(
emqx_action_info,
hard_coded_test_action_info_modules,
0,
[emqx_rule_engine_test_action_info]
),
emqx_action_info:clean_cache(),
{ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
Name = atom_to_binary(?FUNCTION_NAME), Name = atom_to_binary(?FUNCTION_NAME),
ActionConf = ActionConf =
@ -413,6 +397,155 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) ->
[_, _] = meck:unload(), [_, _] = meck:unload(),
ok. ok.
t_apply_rule_test_format_action_failed(_Config) ->
MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_info, MeckOpts),
meck:expect(
emqx_rule_engine_test_connector,
on_query,
3,
{error, {unrecoverable_error, <<"MY REASON">>}}
),
CheckFun =
fun(Bin0) ->
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
%% The last line in the Bin should be the action_success entry
Bin1 = string:trim(Bin0),
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
?assertMatch(
#{
<<"level">> := <<"debug">>,
<<"meta">> := #{
<<"action_info">> := #{
<<"name">> := _,
<<"type">> := <<"rule_engine_test">>
},
<<"client_ids">> := [],
<<"clientid">> := _,
<<"reason">> := <<"MY REASON">>,
<<"rule_id">> := _,
<<"rule_ids">> := [],
<<"rule_trigger_time">> := _,
<<"rule_trigger_times">> := [],
<<"stop_action_after_render">> := false,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
<<"time">> := _
},
LastEntryJSON
)
end,
do_apply_rule_test_format_action_failed_test(CheckFun).
t_apply_rule_test_format_action_out_of_service(_Config) ->
MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_info, MeckOpts),
meck:expect(
emqx_rule_engine_test_connector,
on_query,
3,
{error, {recoverable_error, <<"MY RECOVERABLE REASON">>}}
),
CheckFun =
fun(Bin0) ->
?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])),
%% The last line in the Bin should be the action_success entry
Bin1 = string:trim(Bin0),
LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))),
LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]),
?assertMatch(
#{
<<"level">> := <<"debug">>,
<<"meta">> :=
#{
<<"action_info">> :=
#{
<<"name">> := _,
<<"type">> := <<"rule_engine_test">>
},
<<"clientid">> := _,
<<"reason">> := <<"request_expired">>,
<<"rule_id">> := _,
<<"rule_trigger_time">> := _,
<<"stop_action_after_render">> := false,
<<"trace_tag">> := <<"ACTION">>
},
<<"msg">> := <<"action_failed">>,
<<"time">> := _
},
LastEntryJSON
)
end,
do_apply_rule_test_format_action_failed_test(CheckFun).
do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) ->
meck_in_test_connector(),
{ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}),
Name = atom_to_binary(?FUNCTION_NAME),
ActionConf =
#{
<<"connector">> => Name,
<<"parameters">> => #{<<"values">> => #{}},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => 0,
<<"request_ttl">> => 200
}
},
{ok, _} = emqx_bridge_v2:create(
rule_engine_test,
?FUNCTION_NAME,
ActionConf
),
SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>,
{ok, RuleID} = create_rule_with_action(
rule_engine_test,
?FUNCTION_NAME,
SQL
),
create_trace(Name, ruleid, RuleID),
Now = erlang:system_time(second) - 10,
%% Stop
ParmsNoStopAfterRender = apply_rule_parms(false, Name),
{ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender),
%% Just check that the log file is created as expected
?retry(
_Interval0 = 200,
_NAttempts0 = 100,
begin
Bin = read_rule_trace_file(Name, ruleid, Now),
CheckLastTraceEntryFun(Bin)
end
),
%% Cleanup
ok = emqx_trace:delete(Name),
ok = emqx_rule_engine:delete_rule(RuleID),
ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME),
ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME),
[_, _, _] = meck:unload(),
ok.
meck_in_test_connector() ->
MeckOpts = [passthrough, no_link, no_history, non_strict],
catch meck:new(emqx_connector_info, MeckOpts),
meck:expect(
emqx_connector_info,
hard_coded_test_connector_info_modules,
0,
[emqx_rule_engine_test_connector_info]
),
emqx_connector_info:clean_cache(),
catch meck:new(emqx_action_info, MeckOpts),
meck:expect(
emqx_action_info,
hard_coded_test_action_info_modules,
0,
[emqx_rule_engine_test_action_info]
),
emqx_action_info:clean_cache().
apply_rule_parms(StopAfterRender, Name) -> apply_rule_parms(StopAfterRender, Name) ->
Payload = #{<<"is_stop_after_render">> => StopAfterRender}, Payload = #{<<"is_stop_after_render">> => StopAfterRender},
Context = #{ Context = #{