diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 025451988..62ba70b33 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -105,29 +105,32 @@ parse_and_check(Config, ConfigString, Name) -> resource_id(Config) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, Name). + BridgeName = ?config(bridge_name, Config), + emqx_bridge_resource:resource_id(BridgeType, BridgeName). create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). create_bridge(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), ct:pal("creating bridge with config: ~p", [BridgeConfig]), - emqx_bridge:create(BridgeType, Name, BridgeConfig). + emqx_bridge:create(BridgeType, BridgeName, BridgeConfig). create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). create_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + create_bridge_api(BridgeType, BridgeName, BridgeConfig). + +create_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -164,14 +167,38 @@ update_bridge_api(Config, Overrides) -> ct:pal("bridge update result: ~p", [Res]), Res. +op_bridge_api(Op, BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status = {_, 204, _}, Headers, Body}} -> + {ok, {Status, Headers, Body}}; + {ok, {Status, Headers, Body}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + {error, {Status, Headers, Body}} -> + {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge op result: ~p", [Res]), + Res. + probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). -probe_bridge_api(Config, _Overrides) -> +probe_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - BridgeConfig = ?config(bridge_config, Config), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + BridgeName = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + probe_bridge_api(BridgeType, BridgeName, BridgeConfig). + +probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -289,10 +316,34 @@ t_create_via_http(Config) -> t_start_stop(Config, StopTracePoint) -> BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), - ResourceId = resource_id(Config), + BridgeConfig = ?config(bridge_config, Config), + t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). + +t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin - ?assertMatch({ok, _}, create_bridge(Config)), + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -301,24 +352,48 @@ t_start_stop(Config, StopTracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - %% Check that the bridge probe API doesn't leak atoms. - ProbeRes0 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} - ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), - AtomsBefore = erlang:system_info(atom_count), - %% Probe again; shouldn't have created more atoms. - ProbeRes1 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + %% `start` bridge to trigger `already_started` + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), - AtomsAfter = erlang:system_info(atom_count), - ?assertEqual(AtomsBefore, AtomsAfter), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), - %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Disable the bridge, which will also stop it. ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( @@ -331,8 +406,11 @@ t_start_stop(Config, StopTracePoint) -> ok end, fun(Trace) -> - %% one for each probe, one for real - ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), + %% one for each probe, two for real + ?assertMatch( + [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], + ?of_kind(StopTracePoint, Trace) + ), ok end ), diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 4fc76fc9e..93eab438e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -28,6 +28,9 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(BRIDGE_TYPE, <<"webhook">>). +-define(BRIDGE_NAME, atom_to_binary(?MODULE)). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -36,15 +39,13 @@ groups() -> init_per_suite(_Config) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), []. end_per_suite(_Config) -> - ok = emqx_config:put([bridges], #{}), - ok = emqx_config:put_raw([bridges], #{}), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), @@ -53,10 +54,22 @@ end_per_suite(_Config) -> suite() -> [{timetrap, {seconds, 60}}]. +init_per_testcase(t_bad_bridge_config, Config) -> + Config; +init_per_testcase(t_send_async_connection_timeout, Config) -> + ResponseDelayMS = 500, + Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), + [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; init_per_testcase(_TestCase, Config) -> - Config. + Server = start_http_server(#{response_delay_ms => 0}), + [{http_server, Server} | Config]. -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + case ?config(http_server, Config) of + undefined -> ok; + Server -> stop_http_server(Server) + end, + emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok. @@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) -> %% (Orginally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> - ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), + ct:pal("Starting server for ~p", [Parent]), {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), + ct:pal("Started server on port ~p", [Port]), timer:sleep(100), #{port => Port, sock => Sock, acceptor => Acceptor}. @@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, <<"webhook">>), - Name = maps:get(name, Config, atom_to_binary(?MODULE)), + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, 1), @@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) -> RetConfig. make_bridge(Config) -> - Type = <<"webhook">>, - Name = atom_to_binary(?MODULE), + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, BridgeConfig = bridge_async_config(Config#{ name => Name, type => Type @@ -236,16 +250,15 @@ make_bridge(Config) -> %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% When the connection time out all the queued requests where dropped in -t_send_async_connection_timeout(_Config) -> - ResponseDelayMS = 90, - #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), - % Port = 9000, +t_send_async_connection_timeout(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "async", - connect_timeout => ResponseDelayMS * 2, - request_timeout => 10000, + connect_timeout => 10_000, + request_timeout => ResponseDelayMS * 2, resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, @@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) -> ct:pal("Sent messages\n"), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), receive_request_notifications(MessageIDs, ResponseDelayMS), - stop_http_server(Server), ok. -t_async_free_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_free_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -285,8 +297,8 @@ t_async_free_retries(_Config) -> do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), ok. -t_async_common_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_common_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -323,6 +335,39 @@ t_async_common_retries(_Config) -> do_t_async_retries(Context, {error, something_else}, FnFail), ok. +t_bad_bridge_config(_Config) -> + BridgeConfig = bridge_async_config(#{port => 12345}), + ?assertMatch( + {ok, + {{_, 201, _}, _Headers, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">> + }}}, + emqx_bridge_testlib:create_bridge_api( + ?BRIDGE_TYPE, + ?BRIDGE_NAME, + BridgeConfig + ) + ), + %% try `/start` bridge + ?assertMatch( + {error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}}, + emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + ok. + +t_start_stop(Config) -> + #{port := Port} = ?config(http_server, Config), + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + port => Port + }), + emqx_bridge_testlib:t_start_stop( + ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped + ). + +%% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 149704f76..ce8a1a1a5 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -16,11 +16,10 @@ -module(emqx_connector_http). --include("emqx_connector.hrl"). - -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -219,10 +218,31 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(InstId, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; - {error, Reason} -> {error, Reason} + case start_pool(InstId, PoolOpts) of + ok -> + case do_get_status(InstId, ConnectTimeout) of + ok -> + {ok, State}; + Error -> + ok = ehttpc_sup:stop_pool(InstId), + Error + end; + Error -> + Error + end. + +start_pool(PoolName, PoolOpts) -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ?SLOG(warning, #{ + msg => "emqx_connector_on_start_already_started", + pool_name => PoolName + }), + ok; + Error -> + Error end. on_stop(InstId, _State) -> @@ -230,7 +250,9 @@ on_stop(InstId, _State) -> msg => "stopping_http_connector", connector => InstId }), - ehttpc_sup:stop_pool(InstId). + Res = ehttpc_sup:stop_pool(InstId), + ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), + Res. on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of diff --git a/apps/emqx_connector/test/emqx_connector_http_tests.erl b/apps/emqx_connector/test/emqx_connector_http_tests.erl index 2dc2119f7..c5f6dfe78 100644 --- a/apps/emqx_connector/test/emqx_connector_http_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_http_tests.erl @@ -24,6 +24,8 @@ wrap_auth_headers_test_() -> fun() -> meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), + meck:expect(ehttpc, workers, 1, [{self, self()}]), + meck:expect(ehttpc, health_check, 2, ok), meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(emqx_resource, allocate_resource, 3, ok), [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource] diff --git a/changes/ce/fix-11037.en.md b/changes/ce/fix-11037.en.md new file mode 100644 index 000000000..39b2dc4a6 --- /dev/null +++ b/changes/ce/fix-11037.en.md @@ -0,0 +1 @@ +When starting an HTTP connector EMQX now returns a descriptive error in case the system is unable to connect to the remote target system.