diff --git a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl index d412298cf..406da9dae 100644 --- a/apps/emqx_connector/test/emqx_connector_api_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_api_SUITE.erl @@ -19,6 +19,7 @@ -compile(export_all). -import(emqx_mgmt_api_test_util, [uri/1]). +-import(emqx_common_test_helpers, [on_exit/1]). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). @@ -830,9 +831,9 @@ t_list_disabled_channels(Config) -> ) ), ActionName = ?BRIDGE_NAME, - ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := true}, + ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := false}, ?assertMatch( - {ok, 201, #{<<"enable">> := true}}, + {ok, 201, #{<<"enable">> := false}}, request_json( post, uri(["actions"]), @@ -841,14 +842,35 @@ t_list_disabled_channels(Config) -> ) ), ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME), + ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName), ?assertMatch( - {ok, 200, #{<<"actions">> := [ActionName]}}, + {ok, 200, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Not installed">>, + <<"error">> := <<"Not installed">> + }}, request_json( get, - uri(["connectors", ConnectorID]), + uri(["actions", ActionID]), Config ) ), + %% This should be fast even if the connector resource process is unresponsive. + ConnectorResID = emqx_connector_resource:resource_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME), + suspend_connector_resource(ConnectorResID, Config), + try + ?assertMatch( + {ok, 200, #{<<"actions">> := [ActionName]}}, + request_json( + get, + uri(["connectors", ConnectorID]), + Config + ) + ), + ok + after + resume_connector_resource(ConnectorResID, Config) + end, ok. t_raw_config_response_defaults(Config) -> @@ -987,3 +1009,30 @@ json(B) when is_binary(B) -> ct:pal("Failed to decode json: ~p~n~p", [Reason, B]), Error end. + +suspend_connector_resource(ConnectorResID, Config) -> + Node = ?config(node, Config), + Pid = erpc:call(Node, fun() -> + [Pid] = [ + Pid + || {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup), + ID =:= ConnectorResID + ], + sys:suspend(Pid), + Pid + end), + on_exit(fun() -> erpc:call(Node, fun() -> catch sys:resume(Pid) end) end), + ok. + +resume_connector_resource(ConnectorResID, Config) -> + Node = ?config(node, Config), + erpc:call(Node, fun() -> + [Pid] = [ + Pid + || {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup), + ID =:= ConnectorResID + ], + sys:resume(Pid), + ok + end), + ok. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 046a91458..c1ff960d6 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -453,7 +453,12 @@ channel_health_check(ResId, ChannelId) -> -spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}. get_channels(ResId) -> - emqx_resource_manager:get_channels(ResId). + case emqx_resource_manager:lookup_cached(ResId) of + {error, not_found} -> + {error, not_found}; + {ok, _Group, _ResourceData = #{mod := Mod}} -> + {ok, emqx_resource:call_get_channels(ResId, Mod)} + end. set_resource_status_connecting(ResId) -> emqx_resource_manager:set_resource_status_connecting(ResId).