diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6b85cbbe4..abdabef8c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -329,7 +329,7 @@ on_query( action_id => MessageTag, query_mode => sync }), - {error, invalid_partition_count}; + {error, {unrecoverable_error, invalid_partition_count}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_sync_query_failed, @@ -395,7 +395,7 @@ on_query_async( action_id => MessageTag, query_mode => async }), - {error, invalid_partition_count}; + {error, {unrecoverable_error, invalid_partition_count}}; throw:{bad_kafka_header, _} = Error -> ?tp( emqx_bridge_kafka_impl_producer_async_query_failed, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index ba558792b..a7918610e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -300,6 +300,9 @@ assert_status_api(Line, Type, Name, Status) -> ). -define(assertStatusAPI(TYPE, NAME, STATUS), assert_status_api(?LINE, TYPE, NAME, STATUS)). +get_rule_metrics(RuleId) -> + emqx_metrics_worker:get_metrics(rule_metrics, RuleId). + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -678,6 +681,133 @@ t_ancient_v1_config_migration_without_local_topic(Config) -> ), ok. +%% Checks that, if Kafka raises `invalid_partition_count' error, we bump the corresponding +%% failure rule action metric. +t_invalid_partition_count_metrics(Config) -> + Type = proplists:get_value(type, Config, ?TYPE), + ConnectorName = proplists:get_value(connector_name, Config, <<"c">>), + ConnectorConfig = proplists:get_value(connector_config, Config, connector_config()), + ActionConfig1 = proplists:get_value(action_config, Config, action_config(ConnectorName)), + ?check_trace( + #{timetrap => 10_000}, + begin + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName = <<"a">>, + ActionParams = [ + {action_config, ActionConfig1}, + {action_name, ActionName}, + {action_type, Type} + ], + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + RuleTopic = <<"t/a">>, + {ok, #{<<"id">> := RuleId}} = + emqx_bridge_v2_testlib:create_rule_and_action_http(Type, RuleTopic, [ + {bridge_name, ActionName} + ]), + + {ok, C} = emqtt:start_link([]), + {ok, _} = emqtt:connect(C), + + %%-------------------------------------------- + ?tp(notice, "sync", #{}), + %%-------------------------------------------- + %% Artificially force sync query to be used; otherwise, it's only used when the + %% resource is blocked and retrying. + ok = meck:new(emqx_bridge_kafka_impl_producer, [passthrough, no_history]), + on_exit(fun() -> catch meck:unload() end), + ok = meck:expect(emqx_bridge_kafka_impl_producer, query_mode, 1, simple_sync), + + %% Simulate `invalid_partition_count' + emqx_common_test_helpers:with_mock( + wolff, + send_sync, + fun(_Producers, _Msgs, _Timeout) -> + error({invalid_partition_count, 0, partitioner}) + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"hi">>, 2), + #{ + ?snk_kind := "kafka_producer_invalid_partition_count", + query_mode := sync + } + ), + ?assertMatch( + #{ + counters := #{ + 'actions.total' := 1, + 'actions.failed' := 1 + } + }, + get_rule_metrics(RuleId) + ), + ok + end + ), + + %%-------------------------------------------- + %% Same thing, but async call + ?tp(notice, "async", #{}), + %%-------------------------------------------- + ok = meck:expect( + emqx_bridge_kafka_impl_producer, + query_mode, + fun(Conf) -> meck:passthrough([Conf]) end + ), + ok = emqx_bridge_v2:remove(actions, Type, ActionName), + {ok, {{_, 201, _}, _, #{}}} = + emqx_bridge_v2_testlib:create_action_api( + ActionParams, + #{<<"parameters">> => #{<<"query_mode">> => <<"async">>}} + ), + + %% Simulate `invalid_partition_count' + emqx_common_test_helpers:with_mock( + wolff, + send, + fun(_Producers, _Msgs, _Timeout) -> + error({invalid_partition_count, 0, partitioner}) + end, + fun() -> + {{ok, _}, {ok, _}} = + ?wait_async_action( + emqtt:publish(C, RuleTopic, <<"hi">>, 2), + #{?snk_kind := "rule_engine_applied_all_rules"} + ), + ?assertMatch( + #{ + counters := #{ + 'actions.total' := 2, + 'actions.failed' := 2 + } + }, + get_rule_metrics(RuleId) + ), + ok + end + ), + + ok + end, + fun(Trace) -> + ?assertMatch( + [#{query_mode := sync}, #{query_mode := async} | _], + ?of_kind("kafka_producer_invalid_partition_count", Trace) + ), + ok + end + ), + ok. + %% Tests that deleting/disabling an action that share the same Kafka topic with other %% actions do not disturb the latter. t_multiple_actions_sharing_topic(Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 5f269c112..05d42ed1a 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1401,16 +1401,26 @@ apply_query_fun( query_opts => QueryOpts, min_query => minimize(Query) }, + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(Request, Channels, QueryOpts) of ok -> - Result = Mod:on_query_async( - extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result}; + case + Mod:on_query_async( + extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt + ) + of + {error, _} = Error when IsSimpleQuery -> + %% If this callback returns error, we assume it won't reply + %% anything else and won't retry. + maybe_reply_to(Error, QueryOpts), + Error; + Result -> + {async_return, Result} + end; Error -> maybe_reply_to(Error, QueryOpts) end @@ -1480,16 +1490,26 @@ apply_query_fun( Requests = lists:map( fun(?QUERY(_ReplyTo, Request, _, _ExpireAt, _TraceCtx)) -> Request end, Batch ), + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), IsRetriable = false, AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), case pre_query_channel_check(FirstRequest, Channels, QueryOpts) of ok -> - Result = Mod:on_batch_query_async( - extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt - ), - {async_return, Result}; + case + Mod:on_batch_query_async( + extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt + ) + of + {error, _} = Error when IsSimpleQuery -> + %% If this callback returns error, we assume it won't reply + %% anything else and won't retry. + maybe_reply_to(Error, QueryOpts), + Error; + Result -> + {async_return, Result} + end; Error -> maybe_reply_to(Error, QueryOpts) end diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 21a42c283..0d2b353b1 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -20,6 +20,7 @@ -include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/emqx_trace.hrl"). -include_lib("emqx_resource/include/emqx_resource_errors.hrl"). +-include_lib("snabbkaffe/include/trace.hrl"). -export([ apply_rule/3, @@ -58,6 +59,7 @@ %%------------------------------------------------------------------------------ -spec apply_rules(list(rule()), columns(), envs()) -> ok. apply_rules([], _Columns, _Envs) -> + ?tp("rule_engine_applied_all_rules", #{}), ok; apply_rules([#{enable := false} | More], Columns, Envs) -> apply_rules(More, Columns, Envs); diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 680aac759..da1df58ea 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -216,10 +216,8 @@ init_per_group(metrics_fail_simple, Config) -> (_) -> simple_async end), meck:expect(?BRIDGE_IMPL, on_query, 3, {error, {unrecoverable_error, mecked_failure}}), - meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {ReplyFun, Args}, _) -> - Result = {error, {unrecoverable_error, mecked_failure}}, - erlang:apply(ReplyFun, Args ++ [Result]), - Result + meck:expect(?BRIDGE_IMPL, on_query_async, fun(_, _, {_ReplyFun, _Args}, _) -> + {error, {unrecoverable_error, mecked_failure}} end), [{mecked, [?BRIDGE_IMPL]} | Config]; init_per_group(_Groupname, Config) ->