fix(emqx_connector): report errors in on_start handler

This commit is contained in:
Stefan Strigler 2023-06-13 15:56:45 +02:00
parent 8ba5a54f2e
commit b2a5065641
4 changed files with 106 additions and 28 deletions

View File

@ -124,10 +124,13 @@ create_bridge_api(Config) ->
create_bridge_api(Config, Overrides) -> create_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config), BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, create_bridge_api(BridgeType, BridgeName, BridgeConfig).
create_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]), Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true}, Opts = #{return_all => true},
@ -164,6 +167,24 @@ update_bridge_api(Config, Overrides) ->
ct:pal("bridge update result: ~p", [Res]), ct:pal("bridge update result: ~p", [Res]),
Res. Res.
op_bridge_api(Op, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
{ok, {Status, Headers, Body}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
{error, {Status, Headers, Body}} ->
{error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge op result: ~p", [Res]),
Res.
probe_bridge_api(Config) -> probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}). probe_bridge_api(Config, _Overrides = #{}).

View File

@ -28,6 +28,9 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
all() -> all() ->
emqx_common_test_helpers:all(?MODULE). emqx_common_test_helpers:all(?MODULE).
@ -36,15 +39,13 @@ groups() ->
init_per_suite(_Config) -> init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf), emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector), {ok, _} = application:ensure_all_started(emqx_connector),
[]. [].
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_config:put([bridges], #{}), ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
ok = emqx_config:put_raw([bridges], #{}),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector), _ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge), _ = application:stop(emqx_bridge),
@ -53,10 +54,22 @@ end_per_suite(_Config) ->
suite() -> suite() ->
[{timetrap, {seconds, 60}}]. [{timetrap, {seconds, 60}}].
init_per_testcase(t_bad_bridge_config, Config) ->
Config;
init_per_testcase(t_send_async_connection_timeout, Config) ->
ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
Config. Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
Server -> stop_http_server(Server)
end,
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(), emqx_common_test_helpers:call_janitor(),
ok. ok.
@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) ->
%% (Orginally copied from emqx_bridge_api_SUITE) %% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) -> start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true), process_flag(trap_exit, true),
Parent = self(), Parent = self(),
ct:pal("Starting server for ~p", [Parent]),
{ok, {Port, Sock}} = listen_on_random_port(), {ok, {Port, Sock}} = listen_on_random_port(),
Acceptor = spawn(fun() -> Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig) accept_loop(Sock, Parent, HTTPServerConfig)
end), end),
ct:pal("Started server on port ~p", [Port]),
timer:sleep(100), timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}. #{port => Port, sock => Sock, acceptor => Acceptor}.
@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
bridge_async_config(#{port := Port} = Config) -> bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>), Type = maps:get(type, Config, ?BRIDGE_TYPE),
Name = maps:get(name, Config, atom_to_binary(?MODULE)), Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1), PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"), QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1), ConnectTimeout = maps:get(connect_timeout, Config, 1),
@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) ->
RetConfig. RetConfig.
make_bridge(Config) -> make_bridge(Config) ->
Type = <<"webhook">>, Type = ?BRIDGE_TYPE,
Name = atom_to_binary(?MODULE), Name = ?BRIDGE_NAME,
BridgeConfig = bridge_async_config(Config#{ BridgeConfig = bridge_async_config(Config#{
name => Name, name => Name,
type => Type type => Type
@ -236,16 +250,15 @@ make_bridge(Config) ->
%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in %% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) -> t_send_async_connection_timeout(Config) ->
ResponseDelayMS = 90, ResponseDelayMS = ?config(response_delay_ms, Config),
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), #{port := Port} = ?config(http_server, Config),
% Port = 9000,
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
query_mode => "async", query_mode => "async",
connect_timeout => ResponseDelayMS * 2, connect_timeout => 10_000,
request_timeout => 10000, request_timeout => ResponseDelayMS * 2,
resource_request_ttl => "infinity" resource_request_ttl => "infinity"
}), }),
NumberOfMessagesToSend = 10, NumberOfMessagesToSend = 10,
@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) ->
ct:pal("Sent messages\n"), ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS), receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok. ok.
t_async_free_retries(_Config) -> t_async_free_retries(Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
@ -285,8 +297,8 @@ t_async_free_retries(_Config) ->
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok. ok.
t_async_common_retries(_Config) -> t_async_common_retries(Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}), #{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{ BridgeID = make_bridge(#{
port => Port, port => Port,
pool_size => 1, pool_size => 1,
@ -323,6 +335,28 @@ t_async_common_retries(_Config) ->
do_t_async_retries(Context, {error, something_else}, FnFail), do_t_async_retries(Context, {error, something_else}, FnFail),
ok. ok.
t_bad_bridge_config(_Config) ->
BridgeConfig = bridge_async_config(#{port => 12345}),
?assertMatch(
{ok,
{{_, 201, _}, _Headers, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>
}}},
emqx_bridge_testlib:create_bridge_api(
?BRIDGE_TYPE,
?BRIDGE_NAME,
BridgeConfig
)
),
%% try `/start` bridge
?assertMatch(
{error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}},
emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
ok.
%% helpers
do_t_async_retries(TestContext, Error, Fn) -> do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext, #{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),

View File

@ -219,10 +219,31 @@ on_start(
base_path => BasePath, base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined)) request => preprocess_request(maps:get(request, Config, undefined))
}, },
case ehttpc_sup:start_pool(InstId, PoolOpts) of case start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State}; ok ->
{error, {already_started, _}} -> {ok, State}; case do_get_status(InstId, ConnectTimeout) of
{error, Reason} -> {error, Reason} ok ->
{ok, State};
Error ->
ok = ehttpc_sup:stop_pool(InstId),
Error
end;
Error ->
Error
end.
start_pool(PoolName, PoolOpts) ->
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
?SLOG(warning, #{
msg => "emqx_connector_on_start_already_started",
pool_name => PoolName
}),
ok;
Error ->
Error
end. end.
on_stop(InstId, _State) -> on_stop(InstId, _State) ->

View File

@ -24,6 +24,8 @@ wrap_auth_headers_test_() ->
fun() -> fun() ->
meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}),
meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end),
meck:expect(ehttpc, workers, 1, [{self, self()}]),
meck:expect(ehttpc, health_check, 2, ok),
meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(ehttpc_pool, pick_worker, 1, self()),
meck:expect(emqx_resource, allocate_resource, 3, ok), meck:expect(emqx_resource, allocate_resource, 3, ok),
[ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]