Merge pull request #12162 from thalesmg/fix-lookup-timeout-r54-20231213

fix(connector_api): avoid calling resource process to get channels
This commit is contained in:
Thales Macedo Garitezi 2023-12-14 13:36:26 -03:00 committed by GitHub
commit d5d05a1701
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 59 additions and 5 deletions

View File

@ -19,6 +19,7 @@
-compile(export_all).
-import(emqx_mgmt_api_test_util, [uri/1]).
-import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
@ -830,9 +831,9 @@ t_list_disabled_channels(Config) ->
)
),
ActionName = ?BRIDGE_NAME,
ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := true},
ActionParams = (?KAFKA_BRIDGE(ActionName))#{<<"enable">> := false},
?assertMatch(
{ok, 201, #{<<"enable">> := true}},
{ok, 201, #{<<"enable">> := false}},
request_json(
post,
uri(["actions"]),
@ -841,14 +842,35 @@ t_list_disabled_channels(Config) ->
)
),
ConnectorID = emqx_connector_resource:connector_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
ActionID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE, ActionName),
?assertMatch(
{ok, 200, #{<<"actions">> := [ActionName]}},
{ok, 200, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Not installed">>,
<<"error">> := <<"Not installed">>
}},
request_json(
get,
uri(["connectors", ConnectorID]),
uri(["actions", ActionID]),
Config
)
),
%% This should be fast even if the connector resource process is unresponsive.
ConnectorResID = emqx_connector_resource:resource_id(?CONNECTOR_TYPE, ?CONNECTOR_NAME),
suspend_connector_resource(ConnectorResID, Config),
try
?assertMatch(
{ok, 200, #{<<"actions">> := [ActionName]}},
request_json(
get,
uri(["connectors", ConnectorID]),
Config
)
),
ok
after
resume_connector_resource(ConnectorResID, Config)
end,
ok.
t_raw_config_response_defaults(Config) ->
@ -987,3 +1009,30 @@ json(B) when is_binary(B) ->
ct:pal("Failed to decode json: ~p~n~p", [Reason, B]),
Error
end.
suspend_connector_resource(ConnectorResID, Config) ->
Node = ?config(node, Config),
Pid = erpc:call(Node, fun() ->
[Pid] = [
Pid
|| {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
ID =:= ConnectorResID
],
sys:suspend(Pid),
Pid
end),
on_exit(fun() -> erpc:call(Node, fun() -> catch sys:resume(Pid) end) end),
ok.
resume_connector_resource(ConnectorResID, Config) ->
Node = ?config(node, Config),
erpc:call(Node, fun() ->
[Pid] = [
Pid
|| {ID, Pid, worker, _} <- supervisor:which_children(emqx_resource_manager_sup),
ID =:= ConnectorResID
],
sys:resume(Pid),
ok
end),
ok.

View File

@ -453,7 +453,12 @@ channel_health_check(ResId, ChannelId) ->
-spec get_channels(resource_id()) -> {ok, [{binary(), map()}]} | {error, term()}.
get_channels(ResId) ->
emqx_resource_manager:get_channels(ResId).
case emqx_resource_manager:lookup_cached(ResId) of
{error, not_found} ->
{error, not_found};
{ok, _Group, _ResourceData = #{mod := Mod}} ->
{ok, emqx_resource:call_get_channels(ResId, Mod)}
end.
set_resource_status_connecting(ResId) ->
emqx_resource_manager:set_resource_status_connecting(ResId).