diff --git a/apps/emqx_bridge/src/emqx_bridge_v2.erl b/apps/emqx_bridge/src/emqx_bridge_v2.erl index 70e248e56..7ce266922 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2.erl @@ -202,33 +202,36 @@ lookup(Type, Name) -> %% The connector should always exist %% ... but, in theory, there might be no channels associated to it when we try %% to delete the connector, and then this reference will become dangling... - InstanceData = + ConnectorData = case emqx_resource:get_instance(ConnectorId) of {ok, _, Data} -> Data; {error, not_found} -> #{} end, - %% Find the Bridge V2 status from the InstanceData - Channels = maps:get(added_channels, InstanceData, #{}), + %% Find the Bridge V2 status from the ConnectorData + ConnectorStatus = maps:get(status, ConnectorData, undefined), + Channels = maps:get(added_channels, ConnectorData, #{}), BridgeV2Id = id(Type, Name, BridgeConnector), ChannelStatus = maps:get(BridgeV2Id, Channels, undefined), {DisplayBridgeV2Status, ErrorMsg} = - case ChannelStatus of - #{status := connected} -> - {connected, <<"">>}; - #{status := Status, error := undefined} -> + case {ChannelStatus, ConnectorStatus} of + {#{status := ?status_connected}, _} -> + {?status_connected, <<"">>}; + {#{error := resource_not_operational}, ?status_connecting} -> + {?status_connecting, <<"Not installed">>}; + {#{status := Status, error := undefined}, _} -> {Status, <<"Unknown reason">>}; - #{status := Status, error := Error} -> + {#{status := Status, error := Error}, _} -> {Status, emqx_utils:readable_error_msg(Error)}; - undefined -> - {disconnected, <<"Pending installation">>} + {undefined, _} -> + {?status_disconnected, <<"Not installed">>} end, {ok, #{ type => bin(Type), name => bin(Name), raw_config => RawConf, - resource_data => InstanceData, + resource_data => ConnectorData, status => DisplayBridgeV2Status, error => ErrorMsg }} diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl index 2766088a1..791997fc3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). -import(emqx_common_test_helpers, [on_exit/1]). @@ -43,7 +44,7 @@ con_schema() -> { con_type(), hoconsc:mk( - hoconsc:map(name, typerefl:map()), + hoconsc:map(name, hoconsc:ref(?MODULE, connector_config)), #{ desc => <<"Test Connector Config">>, required => false @@ -52,6 +53,15 @@ con_schema() -> } ]. +fields(connector_config) -> + [ + {enable, hoconsc:mk(typerefl:boolean(), #{})}, + {resource_opts, hoconsc:mk(typerefl:map(), #{})}, + {on_start_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_get_status_fun, hoconsc:mk(typerefl:binary(), #{})}, + {on_add_channel_fun, hoconsc:mk(typerefl:binary(), #{})} + ]. + con_config() -> #{ <<"enable">> => true, @@ -112,6 +122,7 @@ setup_mocks() -> catch meck:new(emqx_connector_schema, MeckOpts), meck:expect(emqx_connector_schema, fields, 1, con_schema()), + meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]), catch meck:new(emqx_connector_resource, MeckOpts), meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()), @@ -159,15 +170,7 @@ init_per_testcase(_TestCase, Config) -> ets:new(fun_table_name(), [named_table, public]), %% Create a fake connector {ok, _} = emqx_connector:create(con_type(), con_name(), con_config()), - [ - {mocked_mods, [ - emqx_connector_schema, - emqx_connector_resource, - - emqx_bridge_v2 - ]} - | Config - ]. + Config. end_per_testcase(_TestCase, _Config) -> ets:delete(fun_table_name()), @@ -846,6 +849,51 @@ t_start_operation_when_on_add_channel_gives_error(_Config) -> ), ok. +t_lookup_status_when_connecting(_Config) -> + ResponseETS = ets:new(response_ets, [public]), + ets:insert(ResponseETS, {on_get_status_value, ?status_connecting}), + OnGetStatusFun = wrap_fun(fun() -> + ets:lookup_element(ResponseETS, on_get_status_value, 2) + end), + + ConnectorConfig = emqx_utils_maps:deep_merge(con_config(), #{ + <<"on_get_status_fun">> => OnGetStatusFun, + <<"resource_opts">> => #{<<"start_timeout">> => 100} + }), + ConnectorName = ?FUNCTION_NAME, + ct:pal("connector config:\n ~p", [ConnectorConfig]), + {ok, _} = emqx_connector:create(con_type(), ConnectorName, ConnectorConfig), + + ActionName = my_test_action, + ChanStatusFun = wrap_fun(fun() -> ?status_disconnected end), + ActionConfig = (bridge_config())#{ + <<"on_get_channel_status_fun">> => ChanStatusFun, + <<"connector">> => atom_to_binary(ConnectorName) + }, + ct:pal("action config:\n ~p", [ActionConfig]), + {ok, _} = emqx_bridge_v2:create(bridge_type(), ActionName, ActionConfig), + + %% Top-level status is connecting if the connector status is connecting, but the + %% channel is not yet installed. `resource_data.added_channels.$channel_id.status' + %% contains true internal status. + {ok, Res} = emqx_bridge_v2:lookup(bridge_type(), ActionName), + ?assertMatch( + #{ + %% This is the action's public status + status := ?status_connecting, + resource_data := + #{ + %% This is the connector's status + status := ?status_connecting + } + }, + Res + ), + #{resource_data := #{added_channels := Channels}} = Res, + [{_Id, ChannelData}] = maps:to_list(Channels), + ?assertMatch(#{status := ?status_disconnected}, ChannelData), + ok. + %% Helper Functions wait_until(Fun) -> diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 059f9ac9f..b99a462b4 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -587,7 +587,7 @@ t_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) @@ -640,7 +640,7 @@ t_fix_broken_bridge_config(Config) -> <<"type">> := ?BRIDGE_TYPE, <<"connector">> := <<"does_not_exist">>, <<"status">> := <<"disconnected">>, - <<"error">> := <<"Pending installation">> + <<"error">> := <<"Not installed">> } ]}, request_json(get, uri([?ROOT]), Config) diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl index 0138832a0..3c5204ea1 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_test_connector.erl @@ -43,8 +43,8 @@ on_start( ) -> Fun = emqx_bridge_v2_SUITE:unwrap_fun(FunRef), Fun(Conf); -on_start(_InstId, _Config) -> - {ok, #{}}. +on_start(_InstId, Config) -> + {ok, Config}. on_add_channel( _InstId, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 4422d8dd5..84401aaa6 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -481,11 +481,11 @@ on_get_status( case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> case wolff_client:check_connectivity(Pid) of - ok -> connected; - {error, Error} -> {connecting, State, Error} + ok -> ?status_connected; + {error, Error} -> {?status_connecting, State, Error} end; {error, _Reason} -> - connecting + ?status_connecting end. on_get_channel_status( @@ -499,10 +499,10 @@ on_get_channel_status( #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), try ok = check_topic_and_leader_connections(ClientId, KafkaTopic), - connected + ?status_connected catch throw:#{reason := restarting} -> - conneting + ?status_connecting end. check_topic_and_leader_connections(ClientId, KafkaTopic) -> diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index fba72a1d7..6c48146cd 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -23,6 +23,8 @@ -include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("brod/include/brod.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + -define(TYPE, kafka_producer). %%------------------------------------------------------------------------------ @@ -55,6 +57,13 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + emqx_common_test_helpers:call_janitor(60_000), + ok. + %%------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------- @@ -163,6 +172,16 @@ kafka_hosts_string() -> KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), KafkaHost ++ ":" ++ KafkaPort. +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -302,3 +321,24 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. + +t_bad_url(_Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = bridge_v2_config(<<"test_connector">>), + ConnectorConfig0 = connector_config(), + ConnectorConfig = ConnectorConfig0#{<<"bootstrap_hosts">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := connecting, + error := [#{reason := unresolvable_hostname}] + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index fa86e68c9..b34da9a63 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -13,6 +13,16 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %%-------------------------------------------------------------------- + +%% bridge/connector/action status +-define(status_connected, connected). +-define(status_connecting, connecting). +-define(status_disconnected, disconnected). +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. The `rm_' prefix is to +%% remind us of that. +-define(rm_status_stopped, stopped). + -type resource_type() :: module(). -type resource_id() :: binary(). -type channel_id() :: binary(). @@ -21,8 +31,12 @@ -type resource_config() :: term(). -type resource_spec() :: map(). -type resource_state() :: term(). --type resource_status() :: connected | disconnected | connecting | stopped. --type channel_status() :: connected | connecting | disconnected. +%% Note: the `stopped' status can only be emitted by `emqx_resource_manager'... Modules +%% implementing `emqx_resource' behavior should not return it. +-type resource_status() :: + ?status_connected | ?status_disconnected | ?status_connecting | ?rm_status_stopped. +-type health_check_status() :: ?status_connected | ?status_disconnected | ?status_connecting. +-type channel_status() :: ?status_connected | ?status_connecting | ?status_disconnected. -type callback_mode() :: always_sync | async_if_possible. -type query_mode() :: simple_sync