diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 41be864c5..9a5de7871 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -757,14 +757,18 @@ do_inc_action_metrics( emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {recoverable_error, _}} + {error, {recoverable_error, _}} = Reason ) -> + FormatterRes = #emqx_trace_format_func_data{ + function = fun trace_formatted_result/1, + data = {ActId, Reason} + }, TraceContext1 = maps:remove(action_id, TraceContext), - trace_action(ActId, "out_of_service", TraceContext1), + trace_action(ActId, "out_of_service", TraceContext1#{reason => FormatterRes}), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {unrecoverable_error, _} = Reason} + {error, {unrecoverable_error, _}} = Reason ) -> TraceContext1 = maps:remove(action_id, TraceContext), FormatterRes = #emqx_trace_format_func_data{ @@ -801,12 +805,12 @@ do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R trace_formatted_result({{bridge_v2, Type, _Name}, R}) -> ConnectorType = emqx_action_info:action_type_to_connector_type(Type), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), - emqx_resource:call_format_query_result(ResourceModule, R); + clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R)); trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) -> BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), - emqx_resource:call_format_query_result(ResourceModule, R); + clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R)); trace_formatted_result({_, R}) -> R. @@ -819,6 +823,15 @@ is_ok_result(R) when is_tuple(R) -> is_ok_result(_) -> false. +clean_up_error_tuple({error, {unrecoverable_error, Reason}}) -> + Reason; +clean_up_error_tuple({error, {recoverable_error, Reason}}) -> + Reason; +clean_up_error_tuple({error, Reason}) -> + Reason; +clean_up_error_tuple(Result) -> + Result. + parse_module_name(Name) when is_binary(Name) -> case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of true -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index c11b40b23..3d3e063d5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -292,23 +292,7 @@ create_trace(TraceName, TraceType, TraceValue) -> {ok, _} = emqx_trace:create(Trace). t_apply_rule_test_batch_separation_stop_after_render(_Config) -> - MeckOpts = [passthrough, no_link, no_history, non_strict], - catch meck:new(emqx_connector_info, MeckOpts), - meck:expect( - emqx_connector_info, - hard_coded_test_connector_info_modules, - 0, - [emqx_rule_engine_test_connector_info] - ), - emqx_connector_info:clean_cache(), - catch meck:new(emqx_action_info, MeckOpts), - meck:expect( - emqx_action_info, - hard_coded_test_action_info_modules, - 0, - [emqx_rule_engine_test_action_info] - ), - emqx_action_info:clean_cache(), + meck_in_test_connector(), {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), Name = atom_to_binary(?FUNCTION_NAME), ActionConf = @@ -413,6 +397,155 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> [_, _] = meck:unload(), ok. +t_apply_rule_test_format_action_failed(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_rule_engine_test_connector, + on_query, + 3, + {error, {unrecoverable_error, <<"MY REASON">>}} + ), + CheckFun = + fun(Bin0) -> + ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), + %% The last line in the Bin should be the action_success entry + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := #{ + <<"action_info">> := #{ + <<"name">> := _, + <<"type">> := <<"rule_engine_test">> + }, + <<"client_ids">> := [], + <<"clientid">> := _, + <<"reason">> := <<"MY REASON">>, + <<"rule_id">> := _, + <<"rule_ids">> := [], + <<"rule_trigger_time">> := _, + <<"rule_trigger_times">> := [], + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_failed">>, + <<"time">> := _ + }, + LastEntryJSON + ) + end, + do_apply_rule_test_format_action_failed_test(CheckFun). + +t_apply_rule_test_format_action_out_of_service(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_rule_engine_test_connector, + on_query, + 3, + {error, {recoverable_error, <<"MY RECOVERABLE REASON">>}} + ), + CheckFun = + fun(Bin0) -> + ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), + %% The last line in the Bin should be the action_success entry + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := + #{ + <<"action_info">> := + #{ + <<"name">> := _, + <<"type">> := <<"rule_engine_test">> + }, + <<"clientid">> := _, + <<"reason">> := <<"request_expired">>, + <<"rule_id">> := _, + <<"rule_trigger_time">> := _, + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_failed">>, + <<"time">> := _ + }, + LastEntryJSON + ) + end, + do_apply_rule_test_format_action_failed_test(CheckFun). + +do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) -> + meck_in_test_connector(), + {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), + Name = atom_to_binary(?FUNCTION_NAME), + ActionConf = + #{ + <<"connector">> => Name, + <<"parameters">> => #{<<"values">> => #{}}, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => 0, + <<"request_ttl">> => 200 + } + }, + {ok, _} = emqx_bridge_v2:create( + rule_engine_test, + ?FUNCTION_NAME, + ActionConf + ), + SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>, + {ok, RuleID} = create_rule_with_action( + rule_engine_test, + ?FUNCTION_NAME, + SQL + ), + create_trace(Name, ruleid, RuleID), + Now = erlang:system_time(second) - 10, + %% Stop + ParmsNoStopAfterRender = apply_rule_parms(false, Name), + {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender), + %% Just check that the log file is created as expected + ?retry( + _Interval0 = 200, + _NAttempts0 = 100, + begin + Bin = read_rule_trace_file(Name, ruleid, Now), + CheckLastTraceEntryFun(Bin) + end + ), + %% Cleanup + ok = emqx_trace:delete(Name), + ok = emqx_rule_engine:delete_rule(RuleID), + ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME), + ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME), + [_, _, _] = meck:unload(), + ok. + +meck_in_test_connector() -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_connector_info, + hard_coded_test_connector_info_modules, + 0, + [emqx_rule_engine_test_connector_info] + ), + emqx_connector_info:clean_cache(), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect( + emqx_action_info, + hard_coded_test_action_info_modules, + 0, + [emqx_rule_engine_test_action_info] + ), + emqx_action_info:clean_cache(). + apply_rule_parms(StopAfterRender, Name) -> Payload = #{<<"is_stop_after_render">> => StopAfterRender}, Context = #{