From 90571b7d8eb05b7badcec810cce2d56d569d1c08 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 14 Nov 2023 13:33:07 -0300 Subject: [PATCH 1/3] test: fix noise about undefined unofficial callbacks --- apps/emqx/test/emqx_cth_suite.erl | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/apps/emqx/test/emqx_cth_suite.erl b/apps/emqx/test/emqx_cth_suite.erl index 5a59238de..401d4f59d 100644 --- a/apps/emqx/test/emqx_cth_suite.erl +++ b/apps/emqx/test/emqx_cth_suite.erl @@ -74,6 +74,9 @@ -export([merge_appspec/2]). +%% "Unofficial" `emqx_config_handler' and `emqx_conf' APIs +-export([schema_module/0, upgrade_raw_conf/1]). + -export_type([appspec/0]). -export_type([appspec_opts/0]). @@ -477,3 +480,18 @@ render_config(Config = #{}) -> unicode:characters_to_binary(hocon_pp:do(Config, #{})); render_config(Config) -> unicode:characters_to_binary(Config). + +%% + +%% "Unofficial" `emqx_config_handler' API +schema_module() -> + ?MODULE. + +%% "Unofficial" `emqx_conf' API +upgrade_raw_conf(Conf) -> + case emqx_release:edition() of + ee -> + emqx_enterprise_schema:upgrade_raw_conf(Conf); + ce -> + emqx_conf_schema:upgrade_raw_conf(Conf) + end. From 36b5d58957050790ebbc9d3d3068c8e5d76c71be Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Nov 2023 09:17:54 -0300 Subject: [PATCH 2/3] test: reorganize test suite a bit --- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 220 +++++++++--------- 1 file changed, 116 insertions(+), 104 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6adb66357..fba72a1d7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -25,6 +25,10 @@ -define(TYPE, kafka_producer). +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + all() -> emqx_common_test_helpers:all(?MODULE). @@ -51,6 +55,118 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_bridge(BridgeName) -> + %% ###################################### + %% Create Kafka message + %% ###################################### + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = list_to_binary("payload" ++ integer_to_list(Time)), + Msg = #{ + clientid => BinTime, + payload => Payload, + timestamp => Time + }, + Offset = resolve_kafka_offset(), + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + check_kafka_message_payload(Offset, Payload). + +resolve_kafka_offset() -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( + Hosts, KafkaTopic, Partition + ), + Offset0. + +check_kafka_message_payload(Offset, ExpectedPayload) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), + ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). + +bridge_v2_config(ConnectorName) -> + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => false, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.payload}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"query_mode">> => <<"sync">>, + <<"required_acks">> => <<"all_isr">>, + <<"sync_query_timeout">> => <<"5s">>, + <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }, + <<"local_topic">> => <<"kafka_t/#">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">> + } + }. + +connector_config() -> + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }. + +kafka_hosts_string() -> + KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), + KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), + KafkaHost ++ ":" ++ KafkaPort. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + t_create_remove_list(_) -> [] = emqx_bridge_v2:list(), ConnectorConfig = connector_config(), @@ -186,107 +302,3 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. - -check_send_message_with_bridge(BridgeName) -> - %% ###################################### - %% Create Kafka message - %% ###################################### - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Payload = list_to_binary("payload" ++ integer_to_list(Time)), - Msg = #{ - clientid => BinTime, - payload => Payload, - timestamp => Time - }, - Offset = resolve_kafka_offset(), - %% ###################################### - %% Send message - %% ###################################### - emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), - %% ###################################### - %% Check if message is sent to Kafka - %% ###################################### - check_kafka_message_payload(Offset, Payload). - -resolve_kafka_offset() -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( - Hosts, KafkaTopic, Partition - ), - Offset0. - -check_kafka_message_payload(Offset, ExpectedPayload) -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), - ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). - -bridge_v2_config(ConnectorName) -> - #{ - <<"connector">> => ConnectorName, - <<"enable">> => true, - <<"kafka">> => #{ - <<"buffer">> => #{ - <<"memory_overload_protection">> => false, - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"2GB">>, - <<"segment_bytes">> => <<"100MB">> - }, - <<"compression">> => <<"no_compression">>, - <<"kafka_header_value_encode_mode">> => <<"none">>, - <<"max_batch_bytes">> => <<"896KB">>, - <<"max_inflight">> => 10, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, - <<"value">> => <<"${.payload}">> - }, - <<"partition_count_refresh_interval">> => <<"60s">>, - <<"partition_strategy">> => <<"random">>, - <<"query_mode">> => <<"sync">>, - <<"required_acks">> => <<"all_isr">>, - <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - }, - <<"local_topic">> => <<"kafka_t/#">>, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"15s">> - } - }. - -connector_config() -> - #{ - <<"authentication">> => <<"none">>, - <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), - <<"connect_timeout">> => <<"5s">>, - <<"enable">> => true, - <<"metadata_request_timeout">> => <<"5s">>, - <<"min_metadata_refresh_interval">> => <<"3s">>, - <<"socket_opts">> => - #{ - <<"recbuf">> => <<"1024KB">>, - <<"sndbuf">> => <<"1024KB">>, - <<"tcp_keepalive">> => <<"none">> - }, - <<"ssl">> => - #{ - <<"ciphers">> => [], - <<"depth">> => 10, - <<"enable">> => false, - <<"hibernate_after">> => <<"5s">>, - <<"log_level">> => <<"notice">>, - <<"reuse_sessions">> => true, - <<"secure_renegotiate">> => true, - <<"verify">> => <<"verify_peer">>, - <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] - } - }. - -kafka_hosts_string() -> - KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), - KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), - KafkaHost ++ ":" ++ KafkaPort. From b92821188b43466f449f73572613ce1d9865dc1f Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 16 Nov 2023 10:08:41 -0300 Subject: [PATCH 3/3] fix(kafka_producer): make status `connecting` while the client fails to connect Fixes https://emqx.atlassian.net/browse/EMQX-11408 To make it consistent with the previous bridge behavior. Also, introduces macros for resource status to avoid problems with typos. --- apps/emqx_bridge/src/emqx_bridge_v2.erl | 25 ++++--- .../emqx_bridge/test/emqx_bridge_v2_SUITE.erl | 68 ++++++++++++++++--- .../test/emqx_bridge_v2_api_SUITE.erl | 4 +- .../test/emqx_bridge_v2_test_connector.erl | 4 +- .../src/emqx_bridge_kafka_impl_producer.erl | 10 +-- .../emqx_bridge_v2_kafka_producer_SUITE.erl | 40 +++++++++++ apps/emqx_resource/include/emqx_resource.hrl | 18 ++++- 7 files changed, 137 insertions(+), 32 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 70e248e56..7ce266922 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -202,33 +202,36 @@ lookup(Type, Name) -> %% The connector should always exist %% ... but, in theory, there might be no channels associated to it when we try %% to delete the connector, and then this reference will become dangling... - InstanceData = + ConnectorData = case emqx_resource:get_instance(ConnectorId) of {ok, _, Data} -> Data; {error, not_found} -> #{} end, - %% Find the Bridge V2 status from the InstanceData - Channels = maps:get(added_channels, InstanceData, #{}), + %% Find the Bridge V2 status from the ConnectorData + ConnectorStatus = maps:get(status, ConnectorData, undefined), + Channels = maps:get(added_channels, ConnectorData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), {DisplayBridgeV2Status, ErrorMsg} = - case ChannelStatus of - #{status := connected} -> - {connected, <<"">>}; - #{status := Status, error := undefined} -> + case {ChannelStatus, ConnectorStatus} of + {#{status := ?status_connected}, _} -> + {?status_connected, <<"">>}; + {#{error := resource_not_operational}, ?status_connecting} -> + {?status_connecting, <<"Not installed">>}; + {#{status := Status, error := undefined}, _} -> {Status, <<"Unknown reason">>}; - #{status := Status, error := Error} -> + {#{status := Status, error := Error}, _} -> {Status, emqx_utils:readable_error_msg(Error)}; - undefined -> - {disconnected, <<"Pending installation">>} + {undefined, _} -> + {?status_disconnected, <<"Not installed">>} end, {ok, #{ type => bin(Type), name => bin(Name), raw_config => RawConf, - resource_data => InstanceData, + resource_data => ConnectorData, status => DisplayBridgeV2Status, error => ErrorMsg }} diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 2766088a1..791997fc3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -43,7 +44,7 @@ con_schema() -> { con_type(), hoconsc:mk( - hoconsc:map(name, typerefl:map()), + hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)), #{ desc => <<"Test Connector Config">>, required => false @@ -52,6 +53,15 @@ con_schema() -> } ]. +fields(connector_config) -> + [ + {enable, hoconsc:mk(typerefl:boolean(), #{})}, + {resource_opts, hoconsc:mk(typerefl:map(), #{})}, + {on_start_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})} + ]. + con_config() -> #{ <<"enable">> => true, @@ -112,6 +122,7 @@ setup_mocks() -> catch meck:new(emqx_connector_schema, MeckOpts), meck:expect(emqx_connector_schema, fields, 1, con_schema()), + meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]), catch meck:new(emqx_connector_resource, MeckOpts), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), @@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) -> ets:new(fun_table_name(), [named_table, public]), %% Create a fake connector {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), - [ - {mocked_mods, [ - emqx_connector_schema, - emqx_connector_resource, - - emqx_bridge_v2 - ]} - | Config - ]. + Config. end_per_testcase(_TestCase, _Config) -> ets:delete(fun_table_name()), @@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) -> ), ok. +t_lookup_status_when_connecting(_Config) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_get_status_fun">> => OnGetStatusFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end), + ActionConfig = (bridge_config())#{ + <<"on_get_channel_status_fun">> => ChanStatusFun, + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + %% Top-level status is connecting if the connector status is connecting, but the + %% channel is not yet installed. `resource_data.added_channels.$channel_id.status' + %% contains true internal status. + {ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName), + ?assertMatch( + #{ + %% This is the action's public status + status := ?status_connecting, + resource_data := + #{ + %% This is the connector's status + status := ?status_connecting + } + }, + Res + ), + #{resource_data := #{added_channels := Channels}} = Res, + [{_Id, ChannelData}] = maps:to_list(Channels), + ?assertMatch(#{status := ?status_disconnected}, ChannelData), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 059f9ac9f..b99a462b4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -587,7 +587,7 @@ t_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) @@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index 0138832a0..3c5204ea1 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -43,8 +43,8 @@ on_start( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(Conf); -on_start(_InstId, _Config) -> - {ok, #{}}. +on_start(_InstId, Config) -> + {ok, Config}. on_add_channel( _InstId, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 4422d8dd5..84401aaa6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -481,11 +481,11 @@ on_get_status( case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> case wolff_client:check_connectivity(Pid) of - ok -> connected; - {error, Error} -> {connecting, State, Error} + ok -> ?status_connected; + {error, Error} -> {?status_connecting, State, Error} end; {error, _Reason} -> - connecting + ?status_connecting end. on_get_channel_status( @@ -499,10 +499,10 @@ on_get_channel_status( #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), try ok = check_topic_and_leader_connections(ClientId, KafkaTopic), - connected + ?status_connected catch throw:#{reason := restarting} -> - conneting + ?status_connecting end. check_topic_and_leader_connections(ClientId, KafkaTopic) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index fba72a1d7..6c48146cd 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(TYPE, kafka_producer). %%------------------------------------------------------------------------------ @@ -55,6 +57,13 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(60_000), + ok. + %%------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------- @@ -163,6 +172,16 @@ kafka_hosts_string() -> KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), KafkaHost ++ ":" ++ KafkaPort. +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -302,3 +321,24 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. + +t_bad_url(_Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = bridge_v2_config(<<"test_connector">>), + ConnectorConfig0 = connector_config(), + ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := connecting, + error := [#{reason := unresolvable_hostname}] + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa86e68c9..b34da9a63 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -13,6 +13,16 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + +%% bridge/connector/action status +-define(status_connected, connected). +-define(status_connecting, connecting). +-define(status_disconnected, disconnected). +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. The `rm_' prefix is to +%% remind us of that. +-define(rm_status_stopped, stopped). + -type resource_type() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). @@ -21,8 +31,12 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). --type resource_status() :: connected | disconnected | connecting | stopped. --type channel_status() :: connected | connecting | disconnected. +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. +-type resource_status() :: + ?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped. +-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting. +-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected. -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync