From 0388e1c1c47239b3c92822cb6912124419160373 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 30 Nov 2023 15:31:50 -0300 Subject: [PATCH] fix(kafka_producer): add `resource_opts` to connector schema, and check for client connectivity Fixes https://emqx.atlassian.net/browse/EMQX-11494 --- .../test/emqx_bridge_v2_testlib.erl | 73 +++++ .../src/emqx_bridge_kafka.erl | 19 +- .../src/emqx_bridge_kafka_impl_producer.erl | 68 ++++- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 267 +++++++++++++++--- .../src/emqx_resource_buffer_worker.erl | 33 ++- .../src/emqx_resource_manager.erl | 5 + 6 files changed, 394 insertions(+), 71 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl index 1ed0eb31b..a6b92caaa 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_testlib.erl @@ -146,6 +146,35 @@ create_bridge(Config, Overrides) -> ct:pal("creating bridge with config: ~p", [BridgeConfig]), emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig). +maybe_json_decode(X) -> + case emqx_utils_json:safe_decode(X, [return_maps]) of + {ok, Decoded} -> Decoded; + {error, _} -> X + end. + +request(Method, Path, Params) -> + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of + {ok, {Status, Headers, Body0}} -> + Body = maybe_json_decode(Body0), + {ok, {Status, Headers, Body}}; + {error, {Status, Headers, Body0}} -> + Body = + case emqx_utils_json:safe_decode(Body0, [return_maps]) of + {ok, Decoded0 = #{<<"message">> := Msg0}} -> + Msg = maybe_json_decode(Msg0), + Decoded0#{<<"message">> := Msg}; + {ok, Decoded0} -> + Decoded0; + {error, _} -> + Body0 + end, + {error, {Status, Headers, Body}}; + Error -> + Error + end. + list_bridges_api() -> Params = [], Path = emqx_mgmt_api_test_util:api_path(["actions"]), @@ -209,6 +238,50 @@ create_bridge_api(Config, Overrides) -> ct:pal("bridge create result: ~p", [Res]), Res. +create_connector_api(Config) -> + create_connector_api(Config, _Overrides = #{}). + +create_connector_api(Config, Overrides) -> + ConnectorConfig0 = ?config(connector_config, Config), + ConnectorName = ?config(connector_name, Config), + ConnectorType = ?config(connector_type, Config), + Method = post, + Path = emqx_mgmt_api_test_util:api_path(["connectors"]), + ConnectorConfig = emqx_utils_maps:deep_merge(ConnectorConfig0, Overrides), + Params = ConnectorConfig#{<<"type">> => ConnectorType, <<"name">> => ConnectorName}, + ct:pal("creating connector (http):\n ~p", [Params]), + Res = request(Method, Path, Params), + ct:pal("connector create (http) result:\n ~p", [Res]), + Res. + +create_action_api(Config) -> + create_action_api(Config, _Overrides = #{}). + +create_action_api(Config, Overrides) -> + ActionName = ?config(action_name, Config), + ActionType = ?config(action_type, Config), + ActionConfig0 = ?config(action_config, Config), + ActionConfig = emqx_utils_maps:deep_merge(ActionConfig0, Overrides), + Params = ActionConfig#{<<"type">> => ActionType, <<"name">> => ActionName}, + Method = post, + Path = emqx_mgmt_api_test_util:api_path(["actions"]), + ct:pal("creating action (http):\n ~p", [Params]), + Res = request(Method, Path, Params), + ct:pal("action create (http) result:\n ~p", [Res]), + Res. + +get_action_api(Config) -> + ActionName = ?config(action_name, Config), + ActionType = ?config(action_type, Config), + ActionId = emqx_bridge_resource:bridge_id(ActionType, ActionName), + Params = [], + Method = get, + Path = emqx_mgmt_api_test_util:api_path(["actions", ActionId]), + ct:pal("getting action (http)"), + Res = request(Method, Path, Params), + ct:pal("get action (http) result:\n ~p", [Res]), + Res. + update_bridge_api(Config) -> update_bridge_api(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 28050d368..951fb5ef5 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -269,7 +269,11 @@ fields(Field) when Field == "put_connector"; Field == "post_connector" -> - emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, kafka_connector_config_fields()); + emqx_connector_schema:api_fields( + Field, + ?CONNECTOR_TYPE, + kafka_connector_config_fields() + ); fields("post_" ++ Type) -> [type_field(Type), name_field() | fields("config_" ++ Type)]; fields("put_" ++ Type) -> @@ -508,8 +512,7 @@ fields(consumer_opts) -> {value_encoding_mode, mk(enum([none, base64]), #{ default => none, desc => ?DESC(consumer_value_encoding_mode) - })}, - {resource_opts, mk(ref(resource_opts), #{default => #{}})} + })} ]; fields(consumer_topic_mapping) -> [ @@ -623,7 +626,7 @@ kafka_connector_config_fields() -> })}, {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}, {ssl, mk(ref(ssl_client_opts), #{})} - ]. + ] ++ [resource_opts()]. producer_opts(ActionOrBridgeV1) -> [ @@ -631,9 +634,11 @@ producer_opts(ActionOrBridgeV1) -> %% for egress bridges with this config, the published messages %% will be forwarded to such bridges. {local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})}, - parameters_field(ActionOrBridgeV1), - {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} - ]. + parameters_field(ActionOrBridgeV1) + ] ++ [resource_opts() || ActionOrBridgeV1 =:= action]. + +resource_opts() -> + {resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}. %% Since e5.3.1, we want to rename the field 'kafka' to 'parameters' %% However we need to keep it backward compatible for generated schema json (version 0.1.0) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 702e4592b..bf8c76bee 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -81,11 +81,24 @@ on_start(InstId, Config) -> ClientId = InstId, emqx_resource:allocate_resource(InstId, ?kafka_client_id, ClientId), ok = ensure_client(ClientId, Hosts, ClientConfig), - %% Check if this is a dry run - {ok, #{ - client_id => ClientId, - installed_bridge_v2s => #{} - }}. + %% Note: we must return `{error, _}' here if the client cannot connect so that the + %% connector will immediately enter the `?status_disconnected' state, and then avoid + %% giving the impression that channels/actions may be added immediately and start + %% buffering, which won't happen if it's `?status_connecting'. That would lead to + %% data loss, since Kafka Producer uses wolff's internal buffering, which is started + %% only when its producers start. + case check_client_connectivity(ClientId) of + ok -> + {ok, #{ + client_id => ClientId, + installed_bridge_v2s => #{} + }}; + {error, {find_client, Reason}} -> + %% Race condition? Crash? We just checked it with `ensure_client'... + {error, Reason}; + {error, {connectivity, Reason}} -> + {error, Reason} + end. on_add_channel( InstId, @@ -478,14 +491,18 @@ on_get_status( _InstId, #{client_id := ClientId} = State ) -> - case wolff_client_sup:find_client(ClientId) of - {ok, Pid} -> - case wolff_client:check_connectivity(Pid) of - ok -> ?status_connected; - {error, Error} -> {?status_connecting, State, Error} - end; - {error, _Reason} -> - ?status_connecting + %% Note: we must avoid returning `?status_disconnected' here if the connector ever was + %% connected. If the connector ever connected, wolff producers might have been + %% sucessfully started, and returning `?status_disconnected' will make resource + %% manager try to restart the producers / connector, thus potentially dropping data + %% held in wolff producer's replayq. + case check_client_connectivity(ClientId) of + ok -> + ?status_connected; + {error, {find_client, _Error}} -> + ?status_connecting; + {error, {connectivity, Error}} -> + {?status_connecting, State, Error} end. on_get_channel_status( @@ -496,13 +513,19 @@ on_get_channel_status( installed_bridge_v2s := Channels } = _State ) -> + %% Note: we must avoid returning `?status_disconnected' here. Returning + %% `?status_disconnected' will make resource manager try to restart the producers / + %% connector, thus potentially dropping data held in wolff producer's replayq. The + %% only exception is if the topic does not exist ("unhealthy target"). #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), try ok = check_topic_and_leader_connections(ClientId, KafkaTopic), ?status_connected catch - throw:#{reason := restarting} -> - ?status_connecting + throw:{unhealthy_target, Msg} -> + throw({unhealthy_target, Msg}); + K:E -> + {?status_connecting, {K, E}} end. check_topic_and_leader_connections(ClientId, KafkaTopic) -> @@ -524,6 +547,21 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) -> }) end. +-spec check_client_connectivity(wolff:client_id()) -> + ok | {error, {connectivity | find_client, term()}}. +check_client_connectivity(ClientId) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + case wolff_client:check_connectivity(Pid) of + ok -> + ok; + {error, Error} -> + {error, {connectivity, Error}} + end; + {error, Reason} -> + {error, {find_client, Reason}} + end. + check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) -> Leaders = case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 2ad0504b4..2913e178a 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -22,6 +22,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -35,6 +36,14 @@ all() -> emqx_common_test_helpers:all(?MODULE). init_per_suite(Config) -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "toxiproxy.emqx.net"), + KafkaPort = list_to_integer(os:getenv("KAFKA_PLAIN_PORT", "9292")), + ProxyName = "kafka_plain", + DirectKafkaHost = os:getenv("KAFKA_DIRECT_PLAIN_HOST", "kafka-1.emqx.net"), + DirectKafkaPort = list_to_integer(os:getenv("KAFKA_DIRECT_PLAIN_PORT", "9092")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), Apps = emqx_cth_suite:start( [ emqx, @@ -50,17 +59,34 @@ init_per_suite(Config) -> ), {ok, _} = emqx_common_test_http:create_default_app(), emqx_bridge_kafka_impl_producer_SUITE:wait_until_kafka_is_up(), - [{apps, Apps} | Config]. + [ + {apps, Apps}, + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ProxyName}, + {kafka_host, KafkaHost}, + {kafka_port, KafkaPort}, + {direct_kafka_host, DirectKafkaHost}, + {direct_kafka_port, DirectKafkaPort} + | Config + ]. end_per_suite(Config) -> Apps = ?config(apps, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_cth_suite:stop(Apps), ok. init_per_testcase(_TestCase, Config) -> Config. -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_common_test_helpers:call_janitor(60_000), ok. @@ -69,6 +95,13 @@ end_per_testcase(_TestCase, _Config) -> %%------------------------------------------------------------------------------------- check_send_message_with_bridge(BridgeName) -> + #{offset := Offset, payload := Payload} = send_message(BridgeName), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + check_kafka_message_payload(Offset, Payload). + +send_message(ActionName) -> %% ###################################### %% Create Kafka message %% ###################################### @@ -84,11 +117,8 @@ check_send_message_with_bridge(BridgeName) -> %% ###################################### %% Send message %% ###################################### - emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), - %% ###################################### - %% Check if message is sent to Kafka - %% ###################################### - check_kafka_message_payload(Offset, Payload). + emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), + #{offset => Offset, payload => Payload}. resolve_kafka_offset() -> KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), @@ -106,6 +136,14 @@ check_kafka_message_payload(Offset, ExpectedPayload) -> {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). +action_config(ConnectorName) -> + action_config(ConnectorName, _Overrides = #{}). + +action_config(ConnectorName, Overrides) -> + Cfg0 = bridge_v2_config(ConnectorName), + Cfg1 = emqx_utils_maps:rename(<<"kafka">>, <<"parameters">>, Cfg0), + emqx_utils_maps:deep_merge(Cfg1, Overrides). + bridge_v2_config(ConnectorName) -> #{ <<"connector">> => ConnectorName, @@ -131,7 +169,9 @@ bridge_v2_config(ConnectorName) -> <<"query_mode">> => <<"sync">>, <<"required_acks">> => <<"all_isr">>, <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + <<"topic">> => list_to_binary( + emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + ) }, <<"local_topic">> => <<"kafka_t/#">>, <<"resource_opts">> => #{ @@ -140,32 +180,37 @@ bridge_v2_config(ConnectorName) -> }. connector_config() -> - #{ - <<"authentication">> => <<"none">>, - <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), - <<"connect_timeout">> => <<"5s">>, - <<"enable">> => true, - <<"metadata_request_timeout">> => <<"5s">>, - <<"min_metadata_refresh_interval">> => <<"3s">>, - <<"socket_opts">> => - #{ - <<"recbuf">> => <<"1024KB">>, - <<"sndbuf">> => <<"1024KB">>, - <<"tcp_keepalive">> => <<"none">> - }, - <<"ssl">> => - #{ - <<"ciphers">> => [], - <<"depth">> => 10, - <<"enable">> => false, - <<"hibernate_after">> => <<"5s">>, - <<"log_level">> => <<"notice">>, - <<"reuse_sessions">> => true, - <<"secure_renegotiate">> => true, - <<"verify">> => <<"verify_peer">>, - <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] - } - }. + connector_config(_Overrides = #{}). + +connector_config(Overrides) -> + Defaults = + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }, + emqx_utils_maps:deep_merge(Defaults, Overrides). kafka_hosts_string() -> KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), @@ -350,13 +395,13 @@ t_bad_url(_Config) -> {ok, #{ resource_data := #{ - status := connecting, + status := ?status_disconnected, error := [#{reason := unresolvable_hostname}] } }}, emqx_connector:lookup(?TYPE, ConnectorName) ), - ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), ok. t_parameters_key_api_spec(_Config) -> @@ -383,3 +428,153 @@ t_http_api_get(_Config) -> emqx_bridge_testlib:list_bridges_api() ), ok. + +t_create_connector_while_connection_is_down(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + KafkaHost = ?config(kafka_host, Config), + KafkaPort = ?config(kafka_port, Config), + Host = iolist_to_binary([KafkaHost, ":", integer_to_binary(KafkaPort)]), + ?check_trace( + begin + Type = ?TYPE, + ConnectorConfig = connector_config(#{ + <<"bootstrap_hosts">> => Host, + <<"resource_opts">> => + #{<<"health_check_interval">> => <<"500ms">>} + }), + ConnectorName = <<"c1">>, + ConnectorId = emqx_connector_resource:resource_id(Type, ConnectorName), + ConnectorParams = [ + {connector_config, ConnectorConfig}, + {connector_name, ConnectorName}, + {connector_type, Type} + ], + ActionName = ConnectorName, + ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), + ActionConfig = action_config( + ConnectorName + ), + ActionParams = [ + {action_config, ActionConfig}, + {action_name, ActionName}, + {action_type, Type} + ], + Disconnected = atom_to_binary(?status_disconnected), + %% Initially, the connection cannot be stablished. Messages are not buffered, + %% hence the status is `?status_disconnected'. + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} = + emqx_bridge_v2_testlib:create_connector_api(ConnectorParams), + {ok, {{_, 201, _}, _, #{<<"status">> := Disconnected}}} = + emqx_bridge_v2_testlib:create_action_api(ActionParams), + #{offset := Offset1} = send_message(ActionName), + #{offset := Offset2} = send_message(ActionName), + #{offset := Offset3} = send_message(ActionName), + ?assertEqual([Offset1], lists:usort([Offset1, Offset2, Offset3])), + ?assertEqual(3, emqx_resource_metrics:matched_get(ActionId)), + ?assertEqual(3, emqx_resource_metrics:failed_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:queuing_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)), + ok + end), + %% Let the connector and action recover + Connected = atom_to_binary(?status_connected), + ?retry( + _Sleep0 = 1_100, + _Attempts0 = 10, + begin + _ = emqx_resource:health_check(ConnectorId), + _ = emqx_resource:health_check(ActionId), + ?assertMatch( + {ok, #{ + status := ?status_connected, + resource_data := + #{ + status := ?status_connected, + added_channels := + #{ + ActionId := #{ + status := ?status_connected + } + } + } + }}, + emqx_bridge_v2:lookup(Type, ActionName), + #{action_id => ActionId} + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"status">> := Connected}}}, + emqx_bridge_v2_testlib:get_action_api(ActionParams) + ) + end + ), + %% Now the connection drops again; this time, status should be + %% `?status_connecting' to avoid destroying wolff_producers and their replayq + %% buffers. + Connecting = atom_to_binary(?status_connecting), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?retry( + _Sleep0 = 1_100, + _Attempts0 = 10, + begin + _ = emqx_resource:health_check(ConnectorId), + _ = emqx_resource:health_check(ActionId), + ?assertMatch( + {ok, #{ + status := ?status_connecting, + resource_data := + #{ + status := ?status_connecting, + added_channels := + #{ + ActionId := #{ + status := ?status_connecting + } + } + } + }}, + emqx_bridge_v2:lookup(Type, ActionName), + #{action_id => ActionId} + ), + ?assertMatch( + {ok, {{_, 200, _}, _, #{<<"status">> := Connecting}}}, + emqx_bridge_v2_testlib:get_action_api(ActionParams) + ) + end + ), + %% This should get enqueued by wolff producers. + spawn_link(fun() -> send_message(ActionName) end), + PreviousMatched = 3, + PreviousFailed = 3, + ?retry( + _Sleep2 = 100, + _Attempts2 = 10, + ?assertEqual(PreviousMatched + 1, emqx_resource_metrics:matched_get(ActionId)) + ), + ?assertEqual(PreviousFailed, emqx_resource_metrics:failed_get(ActionId)), + ?assertEqual(1, emqx_resource_metrics:queuing_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:inflight_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:dropped_get(ActionId)), + ?assertEqual(0, emqx_resource_metrics:success_get(ActionId)), + ok + end), + ?retry( + _Sleep2 = 600, + _Attempts2 = 20, + begin + _ = emqx_resource:health_check(ConnectorId), + _ = emqx_resource:health_check(ActionId), + ?assertEqual(1, emqx_resource_metrics:success_get(ActionId), #{ + metrics => emqx_bridge_v2:get_metrics(Type, ActionName) + }), + ok + end + ), + ok + end, + [] + ), + ok. diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index df038a434..f67f1edb8 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1111,7 +1111,7 @@ is_channel_id(Id) -> %% Check if channel is installed in the connector state. %% There is no need to query the conncector if the channel is not %% installed as the query will fail anyway. -pre_query_channel_check({Id, _} = _Request, Channels) when +pre_query_channel_check({Id, _} = _Request, Channels, QueryOpts) when is_map_key(Id, Channels) -> ChannelStatus = maps:get(Id, Channels), @@ -1119,18 +1119,25 @@ pre_query_channel_check({Id, _} = _Request, Channels) when true -> ok; false -> - maybe_throw_channel_not_installed(Id) + maybe_throw_channel_not_installed(Id, QueryOpts) end; -pre_query_channel_check({Id, _} = _Request, _Channels) -> - maybe_throw_channel_not_installed(Id); -pre_query_channel_check(_Request, _Channels) -> +pre_query_channel_check({Id, _} = _Request, _Channels, QueryOpts) -> + maybe_throw_channel_not_installed(Id, QueryOpts); +pre_query_channel_check(_Request, _Channels, _QueryOpts) -> ok. -maybe_throw_channel_not_installed(Id) -> - %% Fail with a recoverable error if the channel is not installed - %% so that the operation can be retried. It is emqx_resource_manager's - %% responsibility to ensure that the channel installation is retried. +maybe_throw_channel_not_installed(Id, QueryOpts) -> + %% Fail with a recoverable error if the channel is not installed and there are buffer + %% workers involved so that the operation can be retried. Otherwise, this is + %% unrecoverable. It is emqx_resource_manager's responsibility to ensure that the + %% channel installation is retried. + IsSimpleQuery = maps:get(simple_query, QueryOpts, false), case is_channel_id(Id) of + true when IsSimpleQuery -> + error( + {unrecoverable_error, + iolist_to_binary(io_lib:format("channel: \"~s\" not operational", [Id]))} + ); true -> error( {recoverable_error, @@ -1191,7 +1198,7 @@ apply_query_fun( ?APPLY_RESOURCE( call_query, begin - pre_query_channel_check(Request, Channels), + pre_query_channel_check(Request, Channels, QueryOpts), Mod:on_query(extract_connector_id(Id), Request, ResSt) end, Request @@ -1222,7 +1229,7 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Query, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - pre_query_channel_check(Request, Channels), + pre_query_channel_check(Request, Channels, QueryOpts), Result = Mod:on_query_async( extract_connector_id(Id), Request, {ReplyFun, [ReplyContext]}, ResSt ), @@ -1249,7 +1256,7 @@ apply_query_fun( ?APPLY_RESOURCE( call_batch_query, begin - pre_query_channel_check(FirstRequest, Channels), + pre_query_channel_check(FirstRequest, Channels, QueryOpts), Mod:on_batch_query(extract_connector_id(Id), Requests, ResSt) end, Batch @@ -1291,7 +1298,7 @@ apply_query_fun( AsyncWorkerMRef = undefined, InflightItem = ?INFLIGHT_ITEM(Ref, Batch, IsRetriable, AsyncWorkerMRef), ok = inflight_append(InflightTID, InflightItem), - pre_query_channel_check(FirstRequest, Channels), + pre_query_channel_check(FirstRequest, Channels, QueryOpts), Result = Mod:on_batch_query_async( extract_connector_id(Id), Requests, {ReplyFun, [ReplyContext]}, ResSt ), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 11391fb2b..67f22faee 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -1228,6 +1228,11 @@ channel_status({connecting, Error}) -> status => connecting, error => Error }; +channel_status(?status_disconnected) -> + #{ + status => ?status_disconnected, + error => <<"Disconnected for unknown reason">> + }; channel_status(connecting) -> #{ status => connecting,