fix(resource): take error from action/connector before attempting query

Fixes https://emqx.atlassian.net/browse/EMQX-11284
Fixes https://emqx.atlassian.net/browse/EMQX-11298
This commit is contained in:
Thales Macedo Garitezi 2023-11-06 17:54:06 -03:00
parent ac6ad79029
commit 7dcdbc9e51
7 changed files with 96 additions and 40 deletions

View File

@ -535,11 +535,13 @@ do_send_msg_with_enabled_config(
BridgeType, BridgeName, Message, QueryOpts0, Config
) ->
QueryMode = get_query_mode(BridgeType, Config),
ConnectorName = maps:get(connector, Config),
ConnectorResId = emqx_connector_resource:resource_id(BridgeType, ConnectorName),
QueryOpts = maps:merge(
emqx_bridge:query_opts(Config),
QueryOpts0#{
query_mode => QueryMode,
query_mode_cache_override => false
connector_resource_id => ConnectorResId,
query_mode => QueryMode
}
),
BridgeV2Id = id(BridgeType, BridgeName),

View File

@ -557,11 +557,8 @@ check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->
ok ->
ok;
{error, unknown_topic_or_partition} ->
throw(#{
error => unknown_kafka_topic,
kafka_client_id => ClientId,
kafka_topic => KafkaTopic
});
Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]),
throw({unhealthy_target, Msg});
{error, Reason} ->
throw(#{
error => failed_to_check_topic_status,

View File

@ -574,8 +574,14 @@ t_nonexistent_topic(_Config) ->
erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf
),
% TODO: make sure the user facing APIs for Bridge V1 also get this error
#{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check(
?BRIDGE_TYPE_V2, list_to_atom(Name)
?assertMatch(
#{
status := disconnected,
error := {unhealthy_target, <<"Unknown topic or partition: undefined-test-topic">>}
},
emqx_bridge_v2:health_check(
?BRIDGE_TYPE_V2, list_to_atom(Name)
)
),
ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)),
delete_all_bridges(),

View File

@ -140,6 +140,33 @@ t_local_topic(_) ->
ok = emqx_connector:remove(?TYPE, test_connector),
ok.
t_unknown_topic(_Config) ->
ConnectorName = <<"test_connector">>,
BridgeName = <<"test_bridge">>,
BridgeV2Config0 = bridge_v2_config(ConnectorName),
BridgeV2Config = emqx_utils_maps:deep_put(
[<<"kafka">>, <<"topic">>],
BridgeV2Config0,
<<"nonexistent">>
),
ConnectorConfig = connector_config(),
{ok, _} = emqx_connector:create(?TYPE, ConnectorName, ConnectorConfig),
{ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config),
Payload = <<"will be dropped">>,
emqx:publish(emqx_message:make(<<"kafka_t/local">>, Payload)),
BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName),
?retry(
_Sleep0 = 50,
_Attempts0 = 100,
begin
?assertEqual(1, emqx_resource_metrics:matched_get(BridgeV2Id)),
?assertEqual(1, emqx_resource_metrics:dropped_get(BridgeV2Id)),
?assertEqual(1, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)),
ok
end
),
ok.
check_send_message_with_bridge(BridgeName) ->
%% ######################################
%% Create Kafka message

View File

@ -22,7 +22,7 @@
-type resource_spec() :: map().
-type resource_state() :: term().
-type resource_status() :: connected | disconnected | connecting | stopped.
-type channel_status() :: connected | connecting.
-type channel_status() :: connected | connecting | disconnected.
-type callback_mode() :: always_sync | async_if_possible.
-type query_mode() ::
simple_sync
@ -47,7 +47,7 @@
simple_query => boolean(),
reply_to => reply_fun(),
query_mode => query_mode(),
query_mode_cache_override => boolean()
connector_resource_id => resource_id()
}.
-type resource_data() :: #{
id := resource_id(),

View File

@ -379,32 +379,32 @@ query(ResId, Request) ->
-spec query(resource_id(), Request :: term(), query_opts()) ->
Result :: term().
query(ResId, Request, Opts) ->
case get_query_mode_error(ResId, Opts) of
case emqx_resource_manager:get_query_mode_and_last_error(ResId, Opts) of
{error, _} = ErrorTuple ->
ErrorTuple;
{_, unhealthy_target} ->
{ok, {_, unhealthy_target}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{_, {unhealthy_target, _Message}} ->
{ok, {_, {unhealthy_target, Message}}} ->
emqx_resource_metrics:matched_inc(ResId),
emqx_resource_metrics:dropped_resource_stopped_inc(ResId),
?RESOURCE_ERROR(unhealthy_target, "unhealthy target");
{simple_async, _} ->
?RESOURCE_ERROR(unhealthy_target, Message);
{ok, {simple_async, _}} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync, _} ->
{ok, {simple_sync, _}} ->
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts);
{simple_async_internal_buffer, _} ->
{ok, {simple_async_internal_buffer, _}} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
%% so the buffer worker does not need to lookup the cache again
emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts);
{simple_sync_internal_buffer, _} ->
{ok, {simple_sync_internal_buffer, _}} ->
%% This is for bridges/connectors that have internal buffering, such
%% as Kafka and Pulsar producers.
%% TODO(5.1.1): pass Resource instead of ResId to simple APIs
@ -412,30 +412,12 @@ query(ResId, Request, Opts) ->
emqx_resource_buffer_worker:simple_sync_internal_buffer_query(
ResId, Request, Opts
);
{sync, _} ->
{ok, {sync, _}} ->
emqx_resource_buffer_worker:sync_query(ResId, Request, Opts);
{async, _} ->
{ok, {async, _}} ->
emqx_resource_buffer_worker:async_query(ResId, Request, Opts)
end.
get_query_mode_error(ResId, Opts) ->
case maps:get(query_mode_cache_override, Opts, true) of
false ->
case Opts of
#{query_mode := QueryMode} ->
{QueryMode, ok};
_ ->
{async, unhealthy_target}
end;
true ->
case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, #{query_mode := QM, error := Error}} ->
{QM, Error};
{error, not_found} ->
{error, not_found}
end
end.
-spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term().
simple_sync_query(ResId, Request) ->
emqx_resource_buffer_worker:simple_sync_query(ResId, Request).

View File

@ -45,7 +45,8 @@
lookup_cached/1,
get_metrics/1,
reset_metrics/1,
channel_status_is_channel_added/1
channel_status_is_channel_added/1,
get_query_mode_and_last_error/2
]).
-export([
@ -75,6 +76,7 @@
extra
}).
-type data() :: #data{}.
-type channel_status_map() :: #{status := channel_status(), error := term()}.
-define(NAME(ResId), {n, l, {?MODULE, ResId}}).
-define(REF(ResId), {via, gproc, ?NAME(ResId)}).
@ -326,6 +328,46 @@ remove_channel(ResId, ChannelId) ->
get_channels(ResId) ->
safe_call(ResId, get_channels, ?T_OPERATION).
-spec get_query_mode_and_last_error(resource_id(), query_opts()) ->
{ok, {query_mode(), LastError}} | {error, not_found}
when
LastError ::
unhealthy_target
| {unhealthy_target, binary()}
| channel_status_map()
| term().
get_query_mode_and_last_error(RequestResId, Opts = #{connector_resource_id := ResId}) ->
do_get_query_mode_error(ResId, RequestResId, Opts);
get_query_mode_and_last_error(RequestResId, Opts) ->
do_get_query_mode_error(RequestResId, RequestResId, Opts).
do_get_query_mode_error(ResId, RequestResId, Opts) ->
case emqx_resource_manager:lookup_cached(ResId) of
{ok, _Group, ResourceData} ->
QM = get_query_mode(ResourceData, Opts),
Error = get_error(RequestResId, ResourceData),
{ok, {QM, Error}};
{error, not_found} ->
{error, not_found}
end.
get_query_mode(_ResourceData, #{query_mode := QM}) ->
QM;
get_query_mode(#{query_mode := QM}, _Opts) ->
QM.
get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when
is_map_key(ResId, Channels)
->
case maps:get(ResId, Channels) of
#{error := Error} ->
Error;
_ ->
maps:get(error, ResourceData, undefined)
end;
get_error(_ResId, #{error := Error}) ->
Error.
%% Server start/stop callbacks
%% @doc Function called from the supervisor to actually start the server