From ca88f5731ba2c4b1b3bbe6edc65ed852eae209fd Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 6 May 2024 17:33:59 +0200 Subject: [PATCH 1/4] fix(rule tracing): format result traces in a more structured way --- .../src/emqx_bridge_cassandra_connector.erl | 8 +++++++- .../src/emqx_bridge_clickhouse_connector.erl | 10 +++++++++- .../src/emqx_bridge_dynamo_connector.erl | 8 +++++++- apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl | 6 +++++- .../src/emqx_bridge_gcp_pubsub_impl_producer.erl | 8 +++++++- .../src/emqx_bridge_greptimedb_connector.erl | 8 +++++++- .../src/emqx_bridge_influxdb_connector.erl | 6 +++++- .../src/emqx_bridge_iotdb_connector.erl | 6 +++++- .../src/emqx_bridge_kinesis_impl_producer.erl | 8 +++++++- .../src/emqx_bridge_mongodb_connector.erl | 8 +++++++- .../src/emqx_bridge_opents_connector.erl | 8 +++++++- .../src/emqx_bridge_pulsar_connector.erl | 8 +++++++- .../src/emqx_bridge_sqlserver_connector.erl | 8 +++++++- .../src/emqx_bridge_tdengine_connector.erl | 8 +++++++- apps/emqx_mysql/src/emqx_mysql.erl | 10 +++++++++- 15 files changed, 103 insertions(+), 15 deletions(-) diff --git a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl index ef79f78fe..87da71449 100644 --- a/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl +++ b/apps/emqx_bridge_cassandra/src/emqx_bridge_cassandra_connector.erl @@ -29,7 +29,8 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% callbacks of ecpool @@ -459,6 +460,11 @@ handle_result({error, Error}) -> handle_result(Res) -> Res. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + %%-------------------------------------------------------------------- %% utils diff --git a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl index 2c824aa95..f6888cad5 100644 --- a/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl +++ b/apps/emqx_bridge_clickhouse/src/emqx_bridge_clickhouse_connector.erl @@ -38,7 +38,8 @@ on_get_channels/1, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% callbacks for ecpool @@ -519,6 +520,13 @@ transform_and_log_clickhouse_result(ClickhouseErrorResult, ResourceID, SQL) -> to_error_tuple(ClickhouseErrorResult) end. +on_format_query_result(ok) -> + #{result => ok, message => <<"">>}; +on_format_query_result({ok, Message}) -> + #{result => ok, message => Message}; +on_format_query_result(Result) -> + Result. + to_recoverable_error({error, Reason}) -> {error, {recoverable_error, Reason}}; to_recoverable_error(Error) -> diff --git a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl index f89786929..4e974a8a9 100644 --- a/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl +++ b/apps/emqx_bridge_dynamo/src/emqx_bridge_dynamo_connector.erl @@ -26,7 +26,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -184,6 +185,11 @@ on_batch_query(InstanceId, [{_ChannelId, _} | _] = Query, State) -> on_batch_query(_InstanceId, Query, _State) -> {error, {unrecoverable_error, {invalid_request, Query}}}. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + health_check_timeout() -> 2500. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index fc3aa6d3b..d94ce8e15 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -23,7 +23,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -288,6 +289,9 @@ on_query_async( InstanceId, {ChannelId, Msg}, ReplyFunAndArgs, State ). +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_add_channel( InstanceId, #{channels := Channels} = State0, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 12d5d1f2f..48e50c416 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -53,7 +53,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([reply_delegator/2]). @@ -489,6 +490,11 @@ handle_result({error, Reason} = Result, _Request, QueryMode, ResourceId) -> handle_result({ok, _} = Result, _Request, _QueryMode, _ResourceId) -> Result. +on_format_query_result({ok, Info}) -> + #{result => ok, info => Info}; +on_format_query_result(Result) -> + Result. + reply_delegator(ReplyFunAndArgs, Response) -> case Response of {error, Reason} when diff --git a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl index 97eedf3f6..963f0efd0 100644 --- a/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl +++ b/apps/emqx_bridge_greptimedb/src/emqx_bridge_greptimedb_connector.erl @@ -27,7 +27,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). -export([reply_callback/2]). @@ -453,6 +454,11 @@ do_query(InstId, Channel, Client, Points) -> end end. +on_format_query_result({ok, {affected_rows, Rows}}) -> + #{result => ok, affected_rows => Rows}; +on_format_query_result(Result) -> + Result. + do_async_query(InstId, Channel, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "greptimedb_write_point_async", diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index f239d3735..88065b7b3 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -27,7 +27,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). -export([reply_callback/2]). @@ -209,6 +210,9 @@ on_batch_query_async( {error, {unrecoverable_error, Reason}} end. +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_get_status(_InstId, #{client := Client}) -> case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of true -> diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d26b47f73..65fbda936 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -26,7 +26,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -390,6 +391,9 @@ on_batch_query( Error end. +on_format_query_result(Result) -> + emqx_bridge_http_connector:on_format_query_result(Result). + on_add_channel( InstanceId, #{iotdb_version := Version, channels := Channels} = OldState0, diff --git a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl index 8744dfd71..95d193d92 100644 --- a/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl +++ b/apps/emqx_bridge_kinesis/src/emqx_bridge_kinesis_impl_producer.erl @@ -39,7 +39,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([ @@ -318,6 +319,11 @@ handle_result({error, Reason} = Error, Requests, InstanceId) -> }), Error. +on_format_query_result({ok, Result}) -> + #{result => ok, info => Result}; +on_format_query_result(Result) -> + Result. + parse_template(Config) -> #{payload_template := PayloadTemplate, partition_key := PartitionKeyTemplate} = Config, Templates = #{send_message => PayloadTemplate, partition_key => PartitionKeyTemplate}, diff --git a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl index 69c2242e4..6b6db358a 100644 --- a/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl +++ b/apps/emqx_bridge_mongodb/src/emqx_bridge_mongodb_connector.erl @@ -18,7 +18,8 @@ on_get_status/2, on_query/3, on_start/2, - on_stop/2 + on_stop/2, + on_format_query_result/1 ]). %%======================================================================================== @@ -85,6 +86,11 @@ on_query(InstanceId, {Channel, Message0}, #{channels := Channels, connector_stat on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> emqx_mongodb:on_query(InstanceId, Request, ConnectorState). +on_format_query_result({{Result, Info}, Documents}) -> + #{result => Result, info => Info, documents => Documents}; +on_format_query_result(Result) -> + Result. + on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> NewState = State#{channels => maps:remove(ChannelId, Channels)}, {ok, NewState}. diff --git a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl index 509d53284..19e117a0d 100644 --- a/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl +++ b/apps/emqx_bridge_opents/src/emqx_bridge_opents_connector.erl @@ -27,7 +27,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([connector_examples/1]). @@ -175,6 +176,11 @@ on_batch_query( Error end. +on_format_query_result({ok, StatusCode, BodyMap}) -> + #{result => ok, status_code => StatusCode, body => BodyMap}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{server := Server}) -> Result = case opentsdb_connectivity(Server) of diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl index 0cddfab66..9d269493d 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar_connector.erl @@ -20,7 +20,8 @@ on_get_status/2, on_get_channel_status/3, on_query/3, - on_query_async/4 + on_query_async/4, + on_format_query_result/1 ]). -type pulsar_client_id() :: atom(). @@ -234,6 +235,11 @@ on_query_async2(ChannelId, Producers, Message, MessageTmpl, AsyncReplyFn) -> }), pulsar:send(Producers, [PulsarMessage], #{callback_fn => AsyncReplyFn}). +on_format_query_result({ok, Info}) -> + #{result => ok, info => Info}; +on_format_query_result(Result) -> + Result. + %%------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl index 683551316..726d2656a 100644 --- a/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl +++ b/apps/emqx_bridge_sqlserver/src/emqx_bridge_sqlserver_connector.erl @@ -39,7 +39,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). %% callbacks for ecpool @@ -320,6 +321,11 @@ on_batch_query(ResourceId, BatchRequests, State) -> ), do_query(ResourceId, BatchRequests, ?SYNC_QUERY_MODE, State). +on_format_query_result({ok, Rows}) -> + #{result => ok, rows => Rows}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{pool_name := PoolName} = _State) -> Health = emqx_resource_pool:health_check_workers( PoolName, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 67b0e77bc..324694edc 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -28,7 +28,8 @@ on_add_channel/4, on_remove_channel/3, on_get_channels/1, - on_get_channel_status/3 + on_get_channel_status/3, + on_format_query_result/1 ]). -export([connector_examples/1]). @@ -215,6 +216,11 @@ on_batch_query(InstanceId, BatchReq, State) -> ?SLOG(error, LogMeta#{msg => "invalid_request"}), {error, {unrecoverable_error, invalid_request}}. +on_format_query_result({ok, ResultMap}) -> + #{result => ok, info => ResultMap}; +on_format_query_result(Result) -> + Result. + on_get_status(_InstanceId, #{pool_name := PoolName} = State) -> case emqx_resource_pool:health_check_workers( diff --git a/apps/emqx_mysql/src/emqx_mysql.erl b/apps/emqx_mysql/src/emqx_mysql.erl index ff851558a..3ad2fb564 100644 --- a/apps/emqx_mysql/src/emqx_mysql.erl +++ b/apps/emqx_mysql/src/emqx_mysql.erl @@ -30,7 +30,8 @@ on_stop/2, on_query/3, on_batch_query/3, - on_get_status/2 + on_get_status/2, + on_format_query_result/1 ]). %% ecpool connect & reconnect @@ -214,6 +215,13 @@ on_batch_query( }), {error, {unrecoverable_error, invalid_request}}. +on_format_query_result({ok, ColumnNames, Rows}) -> + #{result => ok, column_names => ColumnNames, rows => Rows}; +on_format_query_result({ok, DataList}) -> + #{result => ok, column_names_rows_list => DataList}; +on_format_query_result(Result) -> + Result. + mysql_function(sql) -> query; mysql_function(prepared_query) -> From feecc36607e8b215188898d72de9e2b621e215ac Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 May 2024 09:13:55 +0200 Subject: [PATCH 2/4] fix(rule tracing): clean up error tuple in the action_failed trace --- .../src/emqx_rule_runtime.erl | 23 ++- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 167 ++++++++++++++++-- 2 files changed, 168 insertions(+), 22 deletions(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 41be864c5..9a5de7871 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -757,14 +757,18 @@ do_inc_action_metrics( emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.unknown'); do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {recoverable_error, _}} + {error, {recoverable_error, _}} = Reason ) -> + FormatterRes = #emqx_trace_format_func_data{ + function = fun trace_formatted_result/1, + data = {ActId, Reason} + }, TraceContext1 = maps:remove(action_id, TraceContext), - trace_action(ActId, "out_of_service", TraceContext1), + trace_action(ActId, "out_of_service", TraceContext1#{reason => FormatterRes}), emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); do_inc_action_metrics( #{rule_id := RuleId, action_id := ActId} = TraceContext, - {error, {unrecoverable_error, _} = Reason} + {error, {unrecoverable_error, _}} = Reason ) -> TraceContext1 = maps:remove(action_id, TraceContext), FormatterRes = #emqx_trace_format_func_data{ @@ -801,12 +805,12 @@ do_inc_action_metrics(#{rule_id := RuleId, action_id := ActId} = TraceContext, R trace_formatted_result({{bridge_v2, Type, _Name}, R}) -> ConnectorType = emqx_action_info:action_type_to_connector_type(Type), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), - emqx_resource:call_format_query_result(ResourceModule, R); + clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R)); trace_formatted_result({{bridge, BridgeType, _BridgeName, _ResId}, R}) -> BridgeV2Type = emqx_action_info:bridge_v1_type_to_action_type(BridgeType), ConnectorType = emqx_action_info:action_type_to_connector_type(BridgeV2Type), ResourceModule = emqx_connector_info:resource_callback_module(ConnectorType), - emqx_resource:call_format_query_result(ResourceModule, R); + clean_up_error_tuple(emqx_resource:call_format_query_result(ResourceModule, R)); trace_formatted_result({_, R}) -> R. @@ -819,6 +823,15 @@ is_ok_result(R) when is_tuple(R) -> is_ok_result(_) -> false. +clean_up_error_tuple({error, {unrecoverable_error, Reason}}) -> + Reason; +clean_up_error_tuple({error, {recoverable_error, Reason}}) -> + Reason; +clean_up_error_tuple({error, Reason}) -> + Reason; +clean_up_error_tuple(Result) -> + Result. + parse_module_name(Name) when is_binary(Name) -> case ?IS_VALID_SQL_FUNC_PROVIDER_MODULE_NAME(Name) of true -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index c11b40b23..3d3e063d5 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -292,23 +292,7 @@ create_trace(TraceName, TraceType, TraceValue) -> {ok, _} = emqx_trace:create(Trace). t_apply_rule_test_batch_separation_stop_after_render(_Config) -> - MeckOpts = [passthrough, no_link, no_history, non_strict], - catch meck:new(emqx_connector_info, MeckOpts), - meck:expect( - emqx_connector_info, - hard_coded_test_connector_info_modules, - 0, - [emqx_rule_engine_test_connector_info] - ), - emqx_connector_info:clean_cache(), - catch meck:new(emqx_action_info, MeckOpts), - meck:expect( - emqx_action_info, - hard_coded_test_action_info_modules, - 0, - [emqx_rule_engine_test_action_info] - ), - emqx_action_info:clean_cache(), + meck_in_test_connector(), {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), Name = atom_to_binary(?FUNCTION_NAME), ActionConf = @@ -413,6 +397,155 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> [_, _] = meck:unload(), ok. +t_apply_rule_test_format_action_failed(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_rule_engine_test_connector, + on_query, + 3, + {error, {unrecoverable_error, <<"MY REASON">>}} + ), + CheckFun = + fun(Bin0) -> + ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), + %% The last line in the Bin should be the action_success entry + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := #{ + <<"action_info">> := #{ + <<"name">> := _, + <<"type">> := <<"rule_engine_test">> + }, + <<"client_ids">> := [], + <<"clientid">> := _, + <<"reason">> := <<"MY REASON">>, + <<"rule_id">> := _, + <<"rule_ids">> := [], + <<"rule_trigger_time">> := _, + <<"rule_trigger_times">> := [], + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_failed">>, + <<"time">> := _ + }, + LastEntryJSON + ) + end, + do_apply_rule_test_format_action_failed_test(CheckFun). + +t_apply_rule_test_format_action_out_of_service(_Config) -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_rule_engine_test_connector, + on_query, + 3, + {error, {recoverable_error, <<"MY RECOVERABLE REASON">>}} + ), + CheckFun = + fun(Bin0) -> + ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), + %% The last line in the Bin should be the action_success entry + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := + #{ + <<"action_info">> := + #{ + <<"name">> := _, + <<"type">> := <<"rule_engine_test">> + }, + <<"clientid">> := _, + <<"reason">> := <<"request_expired">>, + <<"rule_id">> := _, + <<"rule_trigger_time">> := _, + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_failed">>, + <<"time">> := _ + }, + LastEntryJSON + ) + end, + do_apply_rule_test_format_action_failed_test(CheckFun). + +do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) -> + meck_in_test_connector(), + {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), + Name = atom_to_binary(?FUNCTION_NAME), + ActionConf = + #{ + <<"connector">> => Name, + <<"parameters">> => #{<<"values">> => #{}}, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => 0, + <<"request_ttl">> => 200 + } + }, + {ok, _} = emqx_bridge_v2:create( + rule_engine_test, + ?FUNCTION_NAME, + ActionConf + ), + SQL = <<"SELECT payload.is_stop_after_render as stop_after_render FROM \"", Name/binary, "\"">>, + {ok, RuleID} = create_rule_with_action( + rule_engine_test, + ?FUNCTION_NAME, + SQL + ), + create_trace(Name, ruleid, RuleID), + Now = erlang:system_time(second) - 10, + %% Stop + ParmsNoStopAfterRender = apply_rule_parms(false, Name), + {ok, _} = call_apply_rule_api(RuleID, ParmsNoStopAfterRender), + %% Just check that the log file is created as expected + ?retry( + _Interval0 = 200, + _NAttempts0 = 100, + begin + Bin = read_rule_trace_file(Name, ruleid, Now), + CheckLastTraceEntryFun(Bin) + end + ), + %% Cleanup + ok = emqx_trace:delete(Name), + ok = emqx_rule_engine:delete_rule(RuleID), + ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME), + ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME), + [_, _, _] = meck:unload(), + ok. + +meck_in_test_connector() -> + MeckOpts = [passthrough, no_link, no_history, non_strict], + catch meck:new(emqx_connector_info, MeckOpts), + meck:expect( + emqx_connector_info, + hard_coded_test_connector_info_modules, + 0, + [emqx_rule_engine_test_connector_info] + ), + emqx_connector_info:clean_cache(), + catch meck:new(emqx_action_info, MeckOpts), + meck:expect( + emqx_action_info, + hard_coded_test_action_info_modules, + 0, + [emqx_rule_engine_test_action_info] + ), + emqx_action_info:clean_cache(). + apply_rule_parms(StopAfterRender, Name) -> Payload = #{<<"is_stop_after_render">> => StopAfterRender}, Context = #{ From 09ee7ec0e22341ebfdc83c89f87e37c2b3e51d49 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 May 2024 15:44:54 +0200 Subject: [PATCH 3/4] fix(rule tracing): make sure that recoverable errors are traced --- .../emqx_trace/emqx_trace_json_formatter.erl | 15 ++ .../src/emqx_resource_buffer_worker.erl | 113 +++++++----- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 171 ++++++++++++++---- .../test/emqx_rule_engine_test_connector.erl | 18 ++ 4 files changed, 234 insertions(+), 83 deletions(-) diff --git a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl index f07dc8c83..82c5a31ee 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_json_formatter.erl @@ -88,6 +88,21 @@ prepare_key_value(packet = K, V, PEncode) -> V end, {K, NewV}; +prepare_key_value(K, {recoverable_error, Msg} = OrgV, PEncode) -> + try + prepare_key_value( + K, + #{ + error_type => recoverable_error, + msg => Msg, + additional_info => <<"The operation may be retried.">> + }, + PEncode + ) + catch + _:_ -> + {K, OrgV} + end; prepare_key_value(rule_ids = K, V, _PEncode) -> NewV = try diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index e35453c94..c610df76c 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -68,7 +68,7 @@ {query, FROM, REQUEST, SENT, EXPIRE_AT, TRACE_CTX} ). -define(SIMPLE_QUERY(FROM, REQUEST, TRACE_CTX), ?QUERY(FROM, REQUEST, false, infinity, TRACE_CTX)). --define(REPLY(FROM, SENT, RESULT), {reply, FROM, SENT, RESULT}). +-define(REPLY(FROM, SENT, RESULT, TRACE_CTX), {reply, FROM, SENT, RESULT, TRACE_CTX}). -define(INFLIGHT_ITEM(Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef), {Ref, BatchOrQuery, IsRetriable, AsyncWorkerMRef} ). @@ -448,8 +448,8 @@ retry_inflight_sync(Ref, QueryOrBatch, Data0) -> Result = call_query(force_sync, Id, Index, Ref, QueryOrBatch, QueryOpts), {ShouldAck, PostFn, DeltaCounters} = case QueryOrBatch of - ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) -> - Reply = ?REPLY(ReplyTo, HasBeenSent, Result), + ?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) -> + Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx), reply_caller_defer_metrics(Id, Reply, QueryOpts); [?QUERY(_, _, _, _, _) | _] = Batch -> batch_reply_caller_defer_metrics(Id, Result, Batch, QueryOpts) @@ -662,10 +662,10 @@ do_flush( inflight_tid := InflightTID } = Data0, %% unwrap when not batching (i.e., batch size == 1) - [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, _TraceCtx) = Request] = Batch, + [?QUERY(ReplyTo, _, HasBeenSent, _ExpireAt, TraceCtx) = Request] = Batch, QueryOpts = #{inflight_tid => InflightTID, simple_query => false}, Result = call_query(async_if_possible, Id, Index, Ref, Request, QueryOpts), - Reply = ?REPLY(ReplyTo, HasBeenSent, Result), + Reply = ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx), {ShouldAck, DeltaCounters} = reply_caller(Id, Reply, QueryOpts), Data1 = aggregate_counters(Data0, DeltaCounters), case ShouldAck of @@ -856,15 +856,15 @@ batch_reply_caller_defer_metrics(Id, BatchResult, Batch, QueryOpts) -> expand_batch_reply(BatchResults, Batch) when is_list(BatchResults) -> lists:map( - fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx), Result}) -> - ?REPLY(FROM, SENT, Result) + fun({?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx), Result}) -> + ?REPLY(FROM, SENT, Result, TraceCtx) end, lists:zip(Batch, BatchResults) ); expand_batch_reply(BatchResult, Batch) -> lists:map( - fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, _TraceCtx)) -> - ?REPLY(FROM, SENT, BatchResult) + fun(?QUERY(FROM, _REQUEST, SENT, _EXPIRE_AT, TraceCtx)) -> + ?REPLY(FROM, SENT, BatchResult, TraceCtx) end, Batch ). @@ -876,12 +876,14 @@ reply_caller(Id, Reply, QueryOpts) -> %% Should only reply to the caller when the decision is final (not %% retriable). See comment on `handle_query_result_pure'. -reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result), _QueryOpts) -> - handle_query_result_pure(Id, Result, HasBeenSent); -reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts) -> +reply_caller_defer_metrics(Id, ?REPLY(undefined, HasBeenSent, Result, TraceCtx), _QueryOpts) -> + handle_query_result_pure(Id, Result, HasBeenSent, TraceCtx); +reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result, TraceCtx), QueryOpts) -> IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsUnrecoverableError = is_unrecoverable_error(Result), - {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), + {ShouldAck, PostFn, DeltaCounters} = handle_query_result_pure( + Id, Result, HasBeenSent, TraceCtx + ), case {ShouldAck, Result, IsUnrecoverableError, IsSimpleQuery} of {ack, {async_return, _}, true, _} -> ok = do_reply_caller(ReplyTo, Result); @@ -921,7 +923,7 @@ batch_reply_dropped(Batch, Result) -> %% This is only called by `simple_{,a}sync_query', so we can bump the %% counters here. handle_query_result(Id, Result, HasBeenSent) -> - {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent), + {ShouldBlock, PostFn, DeltaCounters} = handle_query_result_pure(Id, Result, HasBeenSent, #{}), PostFn(), bump_counters(Id, DeltaCounters), ShouldBlock. @@ -932,37 +934,49 @@ handle_query_result(Id, Result, HasBeenSent) -> %% * the result is a success (or at least a delayed result) %% We also retry even sync requests. In that case, we shouldn't reply %% the caller until one of those final results above happen. --spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean()) -> +-spec handle_query_result_pure(id(), term(), HasBeenSent :: boolean(), TraceCTX :: map()) -> {ack | nack, function(), counters()}. -handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent) -> +handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(exception, Msg), _HasBeenSent, TraceCTX) -> PostFn = fun() -> - ?SLOG(error, #{msg => "resource_exception", info => emqx_utils:redact(Msg)}), + ?TRACE( + error, + "ERROR", + "resource_exception", + (trace_ctx_map(TraceCTX))#{info => emqx_utils:redact(Msg)} + ), ok end, {nack, PostFn, #{}}; -handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent) when +handle_query_result_pure(_Id, ?RESOURCE_ERROR_M(NotWorking, _), _HasBeenSent, _TraceCTX) when NotWorking == not_connected; NotWorking == blocked -> {nack, fun() -> ok end, #{}}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent) -> +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(not_found, Msg), _HasBeenSent, TraceCTX) -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "resource_not_found", info => Msg}), + ?TRACE( + error, + "ERROR", + "resource_not_found", + (trace_ctx_map(TraceCTX))#{id => Id, info => Msg} + ), ok end, {ack, PostFn, #{dropped_resource_not_found => 1}}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent) -> +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(stopped, Msg), _HasBeenSent, TraceCTX) -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "resource_stopped", info => Msg}), + ?TRACE(error, "ERROR", "resource_stopped", (trace_ctx_map(TraceCTX))#{id => Id, info => Msg}), ok end, {ack, PostFn, #{dropped_resource_stopped => 1}}; -handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent) -> +handle_query_result_pure(Id, ?RESOURCE_ERROR_M(Reason, _), _HasBeenSent, TraceCTX) -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "other_resource_error", reason => Reason}), + ?TRACE(error, "ERROR", "other_resource_error", (trace_ctx_map(TraceCTX))#{ + id => Id, reason => Reason + }), ok end, {nack, PostFn, #{}}; -handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> +handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> case is_unrecoverable_error(Error) of true -> PostFn = @@ -979,14 +993,16 @@ handle_query_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> false -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "send_error", reason => Reason}), + ?TRACE(error, "ERROR", "send_error", (trace_ctx_map(TraceCTX))#{ + id => Id, reason => Reason + }), ok end, {nack, PostFn, #{}} end; -handle_query_result_pure(Id, {async_return, Result}, HasBeenSent) -> - handle_query_async_result_pure(Id, Result, HasBeenSent); -handle_query_result_pure(_Id, Result, HasBeenSent) -> +handle_query_result_pure(Id, {async_return, Result}, HasBeenSent, TraceCTX) -> + handle_query_async_result_pure(Id, Result, HasBeenSent, TraceCTX); +handle_query_result_pure(_Id, Result, HasBeenSent, _TraceCTX) -> PostFn = fun() -> assert_ok_result(Result), ok @@ -998,9 +1014,9 @@ handle_query_result_pure(_Id, Result, HasBeenSent) -> end, {ack, PostFn, Counters}. --spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean()) -> +-spec handle_query_async_result_pure(id(), term(), HasBeenSent :: boolean(), map()) -> {ack | nack, function(), counters()}. -handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> +handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent, TraceCTX) -> case is_unrecoverable_error(Error) of true -> PostFn = @@ -1016,16 +1032,18 @@ handle_query_async_result_pure(Id, {error, Reason} = Error, HasBeenSent) -> {ack, PostFn, Counters}; false -> PostFn = fun() -> - ?SLOG(error, #{id => Id, msg => "async_send_error", reason => Reason}), + ?TRACE(error, "ERROR", "async_send_error", (trace_ctx_map(TraceCTX))#{ + id => Id, reason => Reason + }), ok end, {nack, PostFn, #{}} end; -handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent) when is_pid(Pid) -> +handle_query_async_result_pure(_Id, {ok, Pid}, _HasBeenSent, _TraceCTX) when is_pid(Pid) -> {ack, fun() -> ok end, #{}}; -handle_query_async_result_pure(_Id, ok, _HasBeenSent) -> +handle_query_async_result_pure(_Id, ok, _HasBeenSent, _TraceCTX) -> {ack, fun() -> ok end, #{}}; -handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) -> +handle_query_async_result_pure(Id, Results, HasBeenSent, TraceCTX) when is_list(Results) -> All = fun(L) -> case L of {ok, Pid} -> is_pid(Pid); @@ -1037,17 +1055,26 @@ handle_query_async_result_pure(Id, Results, HasBeenSent) when is_list(Results) - {ack, fun() -> ok end, #{}}; false -> PostFn = fun() -> - ?SLOG(error, #{ - id => Id, - msg => "async_batch_send_error", - reason => Results, - has_been_sent => HasBeenSent - }), + ?TRACE( + error, + "ERROR", + "async_batch_send_error", + (trace_ctx_map(TraceCTX))#{ + id => Id, + reason => Results, + has_been_sent => HasBeenSent + } + ), ok end, {nack, PostFn, #{}} end. +trace_ctx_map(undefined) -> + #{}; +trace_ctx_map(Map) -> + Map. + -spec aggregate_counters(data(), counters()) -> data(). aggregate_counters(Data = #{counters := OldCounters}, DeltaCounters) -> Counters = merge_counters(OldCounters, DeltaCounters), @@ -1526,7 +1553,7 @@ do_handle_async_reply( request_ref := Ref, buffer_worker := BufferWorkerPid, inflight_tid := InflightTID, - min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, _TraceCtx) = _Query + min_query := ?QUERY(ReplyTo, _, Sent, _ExpireAt, TraceCtx) = _Query }, Result ) -> @@ -1534,7 +1561,7 @@ do_handle_async_reply( %% but received no ACK, NOT the number of messages queued in the %% inflight window. {Action, PostFn, DeltaCounters} = reply_caller_defer_metrics( - Id, ?REPLY(ReplyTo, Sent, Result), QueryOpts + Id, ?REPLY(ReplyTo, Sent, Result, TraceCtx), QueryOpts ), ?tp(handle_async_reply, #{ diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 3d3e063d5..9529a21dc 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -399,7 +399,7 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> t_apply_rule_test_format_action_failed(_Config) -> MeckOpts = [passthrough, no_link, no_history, non_strict], - catch meck:new(emqx_connector_info, MeckOpts), + catch meck:new(emqx_rule_engine_test_connector, MeckOpts), meck:expect( emqx_rule_engine_test_connector, on_query, @@ -408,8 +408,8 @@ t_apply_rule_test_format_action_failed(_Config) -> ), CheckFun = fun(Bin0) -> + %% The last line in the Bin should be the action_failed entry ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), - %% The last line in the Bin should be the action_success entry Bin1 = string:trim(Bin0), LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), @@ -437,50 +437,141 @@ t_apply_rule_test_format_action_failed(_Config) -> LastEntryJSON ) end, - do_apply_rule_test_format_action_failed_test(CheckFun). + do_apply_rule_test_format_action_failed_test(1, CheckFun). -t_apply_rule_test_format_action_out_of_service(_Config) -> +t_apply_rule_test_format_action_out_of_service_query(_Config) -> + Reason = <<"MY_RECOVERABLE_REASON">>, + CheckFun = out_of_service_check_fun(<<"send_error">>, Reason), + meck_test_connector_recoverable_errors(Reason), + do_apply_rule_test_format_action_failed_test(1, CheckFun). + +t_apply_rule_test_format_action_out_of_service_batch_query(_Config) -> + Reason = <<"MY_RECOVERABLE_REASON">>, + CheckFun = out_of_service_check_fun(<<"send_error">>, Reason), + meck_test_connector_recoverable_errors(Reason), + do_apply_rule_test_format_action_failed_test(10, CheckFun). + +t_apply_rule_test_format_action_out_of_service_async_query(_Config) -> + Reason = <<"MY_RECOVERABLE_REASON">>, + CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason), + meck_test_connector_recoverable_errors(Reason), + meck:expect( + emqx_rule_engine_test_connector, + callback_mode, + 0, + async_if_possible + ), + do_apply_rule_test_format_action_failed_test(1, CheckFun). + +t_apply_rule_test_format_action_out_of_service_async_batch_query(_Config) -> + Reason = <<"MY_RECOVERABLE_REASON">>, + CheckFun = out_of_service_check_fun(<<"async_send_error">>, Reason), + meck_test_connector_recoverable_errors(Reason), + meck:expect( + emqx_rule_engine_test_connector, + callback_mode, + 0, + async_if_possible + ), + do_apply_rule_test_format_action_failed_test(10, CheckFun). + +out_of_service_check_fun(SendErrorMsg, Reason) -> + fun(Bin0) -> + %% The last line in the Bin should be the action_failed entry + ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), + io:format("LOG:\n~s", [Bin0]), + Bin1 = string:trim(Bin0), + LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), + LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := + #{ + <<"action_info">> := + #{ + <<"name">> := _, + <<"type">> := <<"rule_engine_test">> + }, + <<"clientid">> := _, + <<"reason">> := <<"request_expired">>, + <<"rule_id">> := _, + <<"rule_trigger_time">> := _, + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ACTION">> + }, + <<"msg">> := <<"action_failed">>, + <<"time">> := _ + }, + LastEntryJSON + ), + %% We should have at least one entry containing Reason + [ReasonLine | _] = find_lines_with(Bin1, Reason), + ReasonEntryJSON = emqx_utils_json:decode(ReasonLine, [return_maps]), + ?assertMatch( + #{ + <<"level">> := <<"debug">>, + <<"meta">> := + #{ + <<"client_ids">> := [], + <<"clientid">> := _, + <<"id">> := _, + <<"reason">> := + #{ + <<"additional_info">> := _, + <<"error_type">> := <<"recoverable_error">>, + <<"msg">> := <<"MY_RECOVERABLE_REASON">> + }, + <<"rule_id">> := _, + <<"rule_ids">> := [], + <<"rule_trigger_time">> := _, + <<"rule_trigger_times">> := [], + <<"stop_action_after_render">> := false, + <<"trace_tag">> := <<"ERROR">> + }, + <<"msg">> := SendErrorMsg, + <<"time">> := _ + }, + ReasonEntryJSON + ) + end. + +meck_test_connector_recoverable_errors(Reason) -> MeckOpts = [passthrough, no_link, no_history, non_strict], - catch meck:new(emqx_connector_info, MeckOpts), + catch meck:new(emqx_rule_engine_test_connector, MeckOpts), meck:expect( emqx_rule_engine_test_connector, on_query, 3, - {error, {recoverable_error, <<"MY RECOVERABLE REASON">>}} + {error, {recoverable_error, Reason}} ), - CheckFun = - fun(Bin0) -> - ?assertNotEqual(nomatch, binary:match(Bin0, [<<"action_failed">>])), - %% The last line in the Bin should be the action_success entry - Bin1 = string:trim(Bin0), - LastEntry = unicode:characters_to_binary(lists:last(string:split(Bin1, <<"\n">>, all))), - LastEntryJSON = emqx_utils_json:decode(LastEntry, [return_maps]), - ?assertMatch( - #{ - <<"level">> := <<"debug">>, - <<"meta">> := - #{ - <<"action_info">> := - #{ - <<"name">> := _, - <<"type">> := <<"rule_engine_test">> - }, - <<"clientid">> := _, - <<"reason">> := <<"request_expired">>, - <<"rule_id">> := _, - <<"rule_trigger_time">> := _, - <<"stop_action_after_render">> := false, - <<"trace_tag">> := <<"ACTION">> - }, - <<"msg">> := <<"action_failed">>, - <<"time">> := _ - }, - LastEntryJSON - ) - end, - do_apply_rule_test_format_action_failed_test(CheckFun). + meck:expect( + emqx_rule_engine_test_connector, + on_batch_query, + 3, + {error, {recoverable_error, Reason}} + ), + meck:expect( + emqx_rule_engine_test_connector, + on_query_async, + 4, + {error, {recoverable_error, Reason}} + ), + meck:expect( + emqx_rule_engine_test_connector, + on_batch_query_async, + 4, + {error, {recoverable_error, Reason}} + ). -do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) -> +find_lines_with(Data, InLineText) -> + % Split the binary data into lines + Lines = re:split(Data, "\n", [{return, binary}]), + + % Use a list comprehension to filter lines containing 'Reason' + [Line || Line <- Lines, re:run(Line, InLineText, [multiline, {capture, none}]) =/= nomatch]. + +do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) -> meck_in_test_connector(), {ok, _} = emqx_connector:create(rule_engine_test, ?FUNCTION_NAME, #{}), Name = atom_to_binary(?FUNCTION_NAME), @@ -489,8 +580,8 @@ do_apply_rule_test_format_action_failed_test(CheckLastTraceEntryFun) -> <<"connector">> => Name, <<"parameters">> => #{<<"values">> => #{}}, <<"resource_opts">> => #{ - <<"batch_size">> => 1, - <<"batch_time">> => 0, + <<"batch_size">> => BatchSize, + <<"batch_time">> => 10, <<"request_ttl">> => 200 } }, diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl index c22c5fbd5..6a0d6b3ec 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_test_connector.erl @@ -29,7 +29,9 @@ on_start/2, on_stop/2, on_query/3, + on_query_async/4, on_batch_query/3, + on_batch_query_async/4, on_get_status/2, on_add_channel/4, on_remove_channel/3, @@ -85,6 +87,14 @@ on_query( ) -> ok. +on_query_async( + _InstId, + _Query, + _State, + _Callback +) -> + ok. + on_batch_query( _InstId, [{ChannelId, _Req} | _] = Msg, @@ -96,5 +106,13 @@ on_batch_query( emqx_trace:rendered_action_template(ChannelId, #{nothing_to_render => ok}), ok. +on_batch_query_async( + _InstId, + _Batch, + _State, + _Callback +) -> + ok. + on_get_status(_InstId, _State) -> connected. From d5324e295f4bbe9102901c922a64186e45f73fe1 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Tue, 7 May 2024 18:18:18 +0200 Subject: [PATCH 4/4] test: do cleanup in emqx_common_test_helpers:on_exit function Thanks @thalesmg for the suggestion. --- .../emqx_rule_engine_api_rule_apply_SUITE.erl | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl index 9529a21dc..9f03a7bda 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_api_rule_apply_SUITE.erl @@ -87,6 +87,7 @@ end_per_testcase(_TestCase, _Config) -> emqx_bridge_v2_testlib:delete_all_bridges(), emqx_bridge_v2_testlib:delete_all_connectors(), emqx_common_test_helpers:call_janitor(), + meck:unload(), ok. t_basic_apply_rule_trace_ruleid(Config) -> @@ -229,7 +230,6 @@ basic_apply_rule_test_helper(Action, TraceType, StopAfterRender) -> ) || #{<<"meta">> := Meta} <- LogEntries ], - emqx_trace:delete(TraceName), ok. do_final_log_check(Action, Bin0) when is_binary(Action) -> @@ -289,7 +289,11 @@ create_trace(TraceName, TraceType, TraceValue) -> end_at => End, formatter => json }, - {ok, _} = emqx_trace:create(Trace). + {ok, _} = CreateRes = emqx_trace:create(Trace), + emqx_common_test_helpers:on_exit(fun() -> + ok = emqx_trace:delete(TraceName) + end), + CreateRes. t_apply_rule_test_batch_separation_stop_after_render(_Config) -> meck_in_test_connector(), @@ -389,12 +393,6 @@ t_apply_rule_test_batch_separation_stop_after_render(_Config) -> ) end ), - %% Cleanup - ok = emqx_trace:delete(Name), - ok = emqx_rule_engine:delete_rule(RuleID), - ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME), - ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME), - [_, _] = meck:unload(), ok. t_apply_rule_test_format_action_failed(_Config) -> @@ -610,12 +608,6 @@ do_apply_rule_test_format_action_failed_test(BatchSize, CheckLastTraceEntryFun) CheckLastTraceEntryFun(Bin) end ), - %% Cleanup - ok = emqx_trace:delete(Name), - ok = emqx_rule_engine:delete_rule(RuleID), - ok = emqx_bridge_v2:remove(rule_engine_test, ?FUNCTION_NAME), - ok = emqx_connector:remove(rule_engine_test, ?FUNCTION_NAME), - [_, _, _] = meck:unload(), ok. meck_in_test_connector() -> @@ -665,6 +657,9 @@ create_rule_with_action(ActionType, ActionName, SQL) -> case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of {ok, Res0} -> #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + emqx_common_test_helpers:on_exit(fun() -> + emqx_rule_engine:delete_rule(RuleId) + end), {ok, RuleId}; Error -> Error