diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index ef6bd4cd6..5e42b4881 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -537,11 +537,13 @@ do_send_msg_with_enabled_config( BridgeType, BridgeName, Message, QueryOpts0, Config ) -> QueryMode = get_query_mode(BridgeType, Config), + ConnectorName = maps:get(connector, Config), + ConnectorResId = emqx_connector_resource:resource_id(BridgeType, ConnectorName), QueryOpts = maps:merge( emqx_bridge:query_opts(Config), QueryOpts0#{ - query_mode => QueryMode, - query_mode_cache_override => false + connector_resource_id => ConnectorResId, + query_mode => QueryMode } ), BridgeV2Id = id(BridgeType, BridgeName), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 5461829fa..4422d8dd5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -557,11 +557,8 @@ check_topic_status(ClientId, WolffClientPid, KafkaTopic) -> ok -> ok; {error, unknown_topic_or_partition} -> - throw(#{ - error => unknown_kafka_topic, - kafka_client_id => ClientId, - kafka_topic => KafkaTopic - }); + Msg = iolist_to_binary([<<"Unknown topic or partition: ">>, KafkaTopic]), + throw({unhealthy_target, Msg}); {error, Reason} -> throw(#{ error => failed_to_check_topic_status, diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index 9915582ac..b37ef00e9 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -574,8 +574,14 @@ t_nonexistent_topic(_Config) -> erlang:list_to_atom(Type), erlang:list_to_atom(Name), Conf ), % TODO: make sure the user facing APIs for Bridge V1 also get this error - #{status := disconnected, error := #{error := unknown_kafka_topic}} = emqx_bridge_v2:health_check( - ?BRIDGE_TYPE_V2, list_to_atom(Name) + ?assertMatch( + #{ + status := disconnected, + error := {unhealthy_target, <<"Unknown topic or partition: undefined-test-topic">>} + }, + emqx_bridge_v2:health_check( + ?BRIDGE_TYPE_V2, list_to_atom(Name) + ) ), ok = emqx_bridge:remove(list_to_atom(Type), list_to_atom(Name)), delete_all_bridges(), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 37c2e2325..58a16ea67 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -140,6 +140,33 @@ t_local_topic(_) -> ok = emqx_connector:remove(?TYPE, test_connector), ok. +t_unknown_topic(_Config) -> + ConnectorName = <<"test_connector">>, + BridgeName = <<"test_bridge">>, + BridgeV2Config0 = bridge_v2_config(ConnectorName), + BridgeV2Config = emqx_utils_maps:deep_put( + [<<"kafka">>, <<"topic">>], + BridgeV2Config0, + <<"nonexistent">> + ), + ConnectorConfig = connector_config(), + {ok, _} = emqx_connector:create(?TYPE, ConnectorName, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, BridgeV2Config), + Payload = <<"will be dropped">>, + emqx:publish(emqx_message:make(<<"kafka_t/local">>, Payload)), + BridgeV2Id = emqx_bridge_v2:id(?TYPE, BridgeName), + ?retry( + _Sleep0 = 50, + _Attempts0 = 100, + begin + ?assertEqual(1, emqx_resource_metrics:matched_get(BridgeV2Id)), + ?assertEqual(1, emqx_resource_metrics:dropped_get(BridgeV2Id)), + ?assertEqual(1, emqx_resource_metrics:dropped_resource_stopped_get(BridgeV2Id)), + ok + end + ), + ok. + check_send_message_with_bridge(BridgeName) -> %% ###################################### %% Create Kafka message diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index beaea0c99..fa86e68c9 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -22,7 +22,7 @@ -type resource_spec() :: map(). -type resource_state() :: term(). -type resource_status() :: connected | disconnected | connecting | stopped. --type channel_status() :: connected | connecting. +-type channel_status() :: connected | connecting | disconnected. -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync @@ -47,7 +47,7 @@ simple_query => boolean(), reply_to => reply_fun(), query_mode => query_mode(), - query_mode_cache_override => boolean() + connector_resource_id => resource_id() }. -type resource_data() :: #{ id := resource_id(), diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index b0c0ed377..60e94d7e3 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -379,32 +379,32 @@ query(ResId, Request) -> -spec query(resource_id(), Request :: term(), query_opts()) -> Result :: term(). query(ResId, Request, Opts) -> - case get_query_mode_error(ResId, Opts) of + case emqx_resource_manager:get_query_mode_and_last_error(ResId, Opts) of {error, _} = ErrorTuple -> ErrorTuple; - {_, unhealthy_target} -> + {ok, {_, unhealthy_target}} -> emqx_resource_metrics:matched_inc(ResId), emqx_resource_metrics:dropped_resource_stopped_inc(ResId), ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); - {_, {unhealthy_target, _Message}} -> + {ok, {_, {unhealthy_target, Message}}} -> emqx_resource_metrics:matched_inc(ResId), emqx_resource_metrics:dropped_resource_stopped_inc(ResId), - ?RESOURCE_ERROR(unhealthy_target, "unhealthy target"); - {simple_async, _} -> + ?RESOURCE_ERROR(unhealthy_target, Message); + {ok, {simple_async, _}} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); - {simple_sync, _} -> + {ok, {simple_sync, _}} -> %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_sync_query(ResId, Request, Opts); - {simple_async_internal_buffer, _} -> + {ok, {simple_async_internal_buffer, _}} -> %% This is for bridges/connectors that have internal buffering, such %% as Kafka and Pulsar producers. %% TODO(5.1.1): pass Resource instead of ResId to simple APIs %% so the buffer worker does not need to lookup the cache again emqx_resource_buffer_worker:simple_async_query(ResId, Request, Opts); - {simple_sync_internal_buffer, _} -> + {ok, {simple_sync_internal_buffer, _}} -> %% This is for bridges/connectors that have internal buffering, such %% as Kafka and Pulsar producers. %% TODO(5.1.1): pass Resource instead of ResId to simple APIs @@ -412,30 +412,12 @@ query(ResId, Request, Opts) -> emqx_resource_buffer_worker:simple_sync_internal_buffer_query( ResId, Request, Opts ); - {sync, _} -> + {ok, {sync, _}} -> emqx_resource_buffer_worker:sync_query(ResId, Request, Opts); - {async, _} -> + {ok, {async, _}} -> emqx_resource_buffer_worker:async_query(ResId, Request, Opts) end. -get_query_mode_error(ResId, Opts) -> - case maps:get(query_mode_cache_override, Opts, true) of - false -> - case Opts of - #{query_mode := QueryMode} -> - {QueryMode, ok}; - _ -> - {async, unhealthy_target} - end; - true -> - case emqx_resource_manager:lookup_cached(ResId) of - {ok, _Group, #{query_mode := QM, error := Error}} -> - {QM, Error}; - {error, not_found} -> - {error, not_found} - end - end. - -spec simple_sync_query(resource_id(), Request :: term()) -> Result :: term(). simple_sync_query(ResId, Request) -> emqx_resource_buffer_worker:simple_sync_query(ResId, Request). diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 04d40d581..a030080b7 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -45,7 +45,8 @@ lookup_cached/1, get_metrics/1, reset_metrics/1, - channel_status_is_channel_added/1 + channel_status_is_channel_added/1, + get_query_mode_and_last_error/2 ]). -export([ @@ -75,6 +76,7 @@ extra }). -type data() :: #data{}. +-type channel_status_map() :: #{status := channel_status(), error := term()}. -define(NAME(ResId), {n, l, {?MODULE, ResId}}). -define(REF(ResId), {via, gproc, ?NAME(ResId)}). @@ -326,6 +328,46 @@ remove_channel(ResId, ChannelId) -> get_channels(ResId) -> safe_call(ResId, get_channels, ?T_OPERATION). +-spec get_query_mode_and_last_error(resource_id(), query_opts()) -> + {ok, {query_mode(), LastError}} | {error, not_found} +when + LastError :: + unhealthy_target + | {unhealthy_target, binary()} + | channel_status_map() + | term(). +get_query_mode_and_last_error(RequestResId, Opts = #{connector_resource_id := ResId}) -> + do_get_query_mode_error(ResId, RequestResId, Opts); +get_query_mode_and_last_error(RequestResId, Opts) -> + do_get_query_mode_error(RequestResId, RequestResId, Opts). + +do_get_query_mode_error(ResId, RequestResId, Opts) -> + case emqx_resource_manager:lookup_cached(ResId) of + {ok, _Group, ResourceData} -> + QM = get_query_mode(ResourceData, Opts), + Error = get_error(RequestResId, ResourceData), + {ok, {QM, Error}}; + {error, not_found} -> + {error, not_found} + end. + +get_query_mode(_ResourceData, #{query_mode := QM}) -> + QM; +get_query_mode(#{query_mode := QM}, _Opts) -> + QM. + +get_error(ResId, #{added_channels := #{} = Channels} = ResourceData) when + is_map_key(ResId, Channels) +-> + case maps:get(ResId, Channels) of + #{error := Error} -> + Error; + _ -> + maps:get(error, ResourceData, undefined) + end; +get_error(_ResId, #{error := Error}) -> + Error. + %% Server start/stop callbacks %% @doc Function called from the supervisor to actually start the server