Merge pull request #11037 from sstrigler/EMQX-10174-e-5-1-0-reconnect-does-not-show-any-errors-for-apache-io-tdb-or-webhook-or-kafka

fix(emqx_connector): report errors in on_start handler
This commit is contained in:
Stefan Strigler 2023-06-14 13:20:47 +02:00 committed by GitHub
commit bf62cdc3e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 206 additions and 58 deletions

View File

@ -105,29 +105,32 @@ parse_and_check(Config, ConfigString, Name) ->
resource_id(Config) ->
BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config),
emqx_bridge_resource:resource_id(BridgeType, Name).
BridgeName = ?config(bridge_name, Config),
emqx_bridge_resource:resource_id(BridgeType, BridgeName).
create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config),
BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
emqx_bridge:create(BridgeType, Name, BridgeConfig).
emqx_bridge:create(BridgeType, BridgeName, BridgeConfig).
create_bridge_api(Config) ->
create_bridge_api(Config, _Overrides = #{}).
create_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config),
BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name},
create_bridge_api(BridgeType, BridgeName, BridgeConfig).
create_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
@ -164,14 +167,38 @@ update_bridge_api(Config, Overrides) ->
ct:pal("bridge update result: ~p", [Res]),
Res.
op_bridge_api(Op, BridgeType, BridgeName) ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
Res =
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
{ok, {Status = {_, 204, _}, Headers, Body}} ->
{ok, {Status, Headers, Body}};
{ok, {Status, Headers, Body}} ->
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
{error, {Status, Headers, Body}} ->
{error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
Error ->
Error
end,
ct:pal("bridge op result: ~p", [Res]),
Res.
probe_bridge_api(Config) ->
probe_bridge_api(Config, _Overrides = #{}).
probe_bridge_api(Config, _Overrides) ->
probe_bridge_api(Config, Overrides) ->
BridgeType = ?config(bridge_type, Config),
Name = ?config(bridge_name, Config),
BridgeConfig = ?config(bridge_config, Config),
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name},
BridgeName = ?config(bridge_name, Config),
BridgeConfig0 = ?config(bridge_config, Config),
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
Opts = #{return_all => true},
@ -289,10 +316,34 @@ t_create_via_http(Config) ->
t_start_stop(Config, StopTracePoint) ->
BridgeType = ?config(bridge_type, Config),
BridgeName = ?config(bridge_name, Config),
ResourceId = resource_id(Config),
BridgeConfig = ?config(bridge_config, Config),
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
?check_trace(
begin
?assertMatch({ok, _}, create_bridge(Config)),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes0 = probe_bridge_api(
BridgeType,
BridgeName,
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
BridgeType,
BridgeName,
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
%% Since the connection process is async, we give it some time to
%% stabilize and avoid flakiness.
?retry(
@ -301,24 +352,48 @@ t_start_stop(Config, StopTracePoint) ->
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Check that the bridge probe API doesn't leak atoms.
ProbeRes0 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
AtomsBefore = erlang:system_info(atom_count),
%% Probe again; shouldn't have created more atoms.
ProbeRes1 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
%% `start` bridge to trigger `already_started`
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
AtomsAfter = erlang:system_info(atom_count),
?assertEqual(AtomsBefore, AtomsAfter),
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
%% Now stop the bridge.
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName),
#{?snk_kind := StopTracePoint},
5_000
)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName)
),
?assertEqual(
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
),
?assertMatch(
{ok, {{_, 204, _}, _Headers, []}},
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
%% Disable the bridge, which will also stop it.
?assertMatch(
{{ok, _}, {ok, _}},
?wait_async_action(
@ -331,8 +406,11 @@ t_start_stop(Config, StopTracePoint) ->
ok
end,
fun(Trace) ->
%% one for each probe, one for real
?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)),
%% one for each probe, two for real
?assertMatch(
[_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
?of_kind(StopTracePoint, Trace)
),
ok
end
),

View File

@ -28,6 +28,9 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(BRIDGE_TYPE, <<"webhook">>).
-define(BRIDGE_NAME, atom_to_binary(?MODULE)).
all() ->
emqx_common_test_helpers:all(?MODULE).
@ -36,15 +39,13 @@ groups() ->
init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
[].
end_per_suite(_Config) ->
ok = emqx_config:put([bridges], #{}),
ok = emqx_config:put_raw([bridges], #{}),
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]),
ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
@ -53,10 +54,22 @@ end_per_suite(_Config) ->
suite() ->
[{timetrap, {seconds, 60}}].
init_per_testcase(t_bad_bridge_config, Config) ->
Config;
init_per_testcase(t_send_async_connection_timeout, Config) ->
ResponseDelayMS = 500,
Server = start_http_server(#{response_delay_ms => ResponseDelayMS}),
[{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config];
init_per_testcase(_TestCase, Config) ->
Config.
Server = start_http_server(#{response_delay_ms => 0}),
[{http_server, Server} | Config].
end_per_testcase(_TestCase, _Config) ->
end_per_testcase(_TestCase, Config) ->
case ?config(http_server, Config) of
undefined -> ok;
Server -> stop_http_server(Server)
end,
emqx_bridge_testlib:delete_all_bridges(),
emqx_common_test_helpers:call_janitor(),
ok.
@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) ->
%% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true),
Parent = self(),
ct:pal("Starting server for ~p", [Parent]),
{ok, {Port, Sock}} = listen_on_random_port(),
Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig)
end),
ct:pal("Started server on port ~p", [Port]),
timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}.
@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>),
Name = maps:get(name, Config, atom_to_binary(?MODULE)),
Type = maps:get(type, Config, ?BRIDGE_TYPE),
Name = maps:get(name, Config, ?BRIDGE_NAME),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) ->
RetConfig.
make_bridge(Config) ->
Type = <<"webhook">>,
Name = atom_to_binary(?MODULE),
Type = ?BRIDGE_TYPE,
Name = ?BRIDGE_NAME,
BridgeConfig = bridge_async_config(Config#{
name => Name,
type => Type
@ -236,16 +250,15 @@ make_bridge(Config) ->
%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) ->
ResponseDelayMS = 90,
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}),
% Port = 9000,
t_send_async_connection_timeout(Config) ->
ResponseDelayMS = ?config(response_delay_ms, Config),
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
connect_timeout => 10_000,
request_timeout => ResponseDelayMS * 2,
resource_request_ttl => "infinity"
}),
NumberOfMessagesToSend = 10,
@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) ->
ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok.
t_async_free_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
t_async_free_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
@ -285,8 +297,8 @@ t_async_free_retries(_Config) ->
do_t_async_retries(Context, {error, {shutdown, normal}}, Fn),
ok.
t_async_common_retries(_Config) ->
#{port := Port} = start_http_server(#{response_delay_ms => 0}),
t_async_common_retries(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
@ -323,6 +335,39 @@ t_async_common_retries(_Config) ->
do_t_async_retries(Context, {error, something_else}, FnFail),
ok.
t_bad_bridge_config(_Config) ->
BridgeConfig = bridge_async_config(#{port => 12345}),
?assertMatch(
{ok,
{{_, 201, _}, _Headers, #{
<<"status">> := <<"disconnected">>,
<<"status_reason">> := <<"Connection refused">>
}}},
emqx_bridge_testlib:create_bridge_api(
?BRIDGE_TYPE,
?BRIDGE_NAME,
BridgeConfig
)
),
%% try `/start` bridge
?assertMatch(
{error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}},
emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME)
),
ok.
t_start_stop(Config) ->
#{port := Port} = ?config(http_server, Config),
BridgeConfig = bridge_async_config(#{
type => ?BRIDGE_TYPE,
name => ?BRIDGE_NAME,
port => Port
}),
emqx_bridge_testlib:t_start_stop(
?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped
).
%% helpers
do_t_async_retries(TestContext, Error, Fn) ->
#{error_attempts := ErrorAttempts} = TestContext,
persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0),

View File

@ -16,11 +16,10 @@
-module(emqx_connector_http).
-include("emqx_connector.hrl").
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-behaviour(emqx_resource).
@ -219,10 +218,31 @@ on_start(
base_path => BasePath,
request => preprocess_request(maps:get(request, Config, undefined))
},
case ehttpc_sup:start_pool(InstId, PoolOpts) of
{ok, _} -> {ok, State};
{error, {already_started, _}} -> {ok, State};
{error, Reason} -> {error, Reason}
case start_pool(InstId, PoolOpts) of
ok ->
case do_get_status(InstId, ConnectTimeout) of
ok ->
{ok, State};
Error ->
ok = ehttpc_sup:stop_pool(InstId),
Error
end;
Error ->
Error
end.
start_pool(PoolName, PoolOpts) ->
case ehttpc_sup:start_pool(PoolName, PoolOpts) of
{ok, _} ->
ok;
{error, {already_started, _}} ->
?SLOG(warning, #{
msg => "emqx_connector_on_start_already_started",
pool_name => PoolName
}),
ok;
Error ->
Error
end.
on_stop(InstId, _State) ->
@ -230,7 +250,9 @@ on_stop(InstId, _State) ->
msg => "stopping_http_connector",
connector => InstId
}),
ehttpc_sup:stop_pool(InstId).
Res = ehttpc_sup:stop_pool(InstId),
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
Res.
on_query(InstId, {send_message, Msg}, State) ->
case maps:get(request, State, undefined) of

View File

@ -24,6 +24,8 @@ wrap_auth_headers_test_() ->
fun() ->
meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}),
meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end),
meck:expect(ehttpc, workers, 1, [{self, self()}]),
meck:expect(ehttpc, health_check, 2, ok),
meck:expect(ehttpc_pool, pick_worker, 1, self()),
meck:expect(emqx_resource, allocate_resource, 3, ok),
[ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]

View File

@ -0,0 +1 @@
When starting an HTTP connector EMQX now returns a descriptive error in case the system is unable to connect to the remote target system.