From b2a5065641df07fedf4ef97b7f3ea7714d8aa9a8 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Tue, 13 Jun 2023 15:56:45 +0200 Subject: [PATCH] fix(emqx_connector): report errors in on_start handler --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 25 +++++- .../test/emqx_bridge_webhook_SUITE.erl | 78 +++++++++++++------ .../src/emqx_connector_http.erl | 29 ++++++- .../test/emqx_connector_http_tests.erl | 2 + 4 files changed, 106 insertions(+), 28 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index 025451988..d5fddaea8 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -124,10 +124,13 @@ create_bridge_api(Config) -> create_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + create_bridge_api(BridgeType, BridgeName, BridgeConfig). + +create_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -164,6 +167,24 @@ update_bridge_api(Config, Overrides) -> ct:pal("bridge update result: ~p", [Res]), Res. +op_bridge_api(Op, BridgeType, BridgeName) -> + BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]), + AuthHeader = emqx_mgmt_api_test_util:auth_header_(), + Opts = #{return_all => true}, + ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), + Res = + case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status, Headers, Body}} -> + {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + {error, {Status, Headers, Body}} -> + {error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; + Error -> + Error + end, + ct:pal("bridge op result: ~p", [Res]), + Res. + probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index 4fc76fc9e..a1ff465c9 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -28,6 +28,9 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-define(BRIDGE_TYPE, <<"webhook">>). +-define(BRIDGE_NAME, atom_to_binary(?MODULE)). + all() -> emqx_common_test_helpers:all(?MODULE). @@ -36,15 +39,13 @@ groups() -> init_per_suite(_Config) -> emqx_common_test_helpers:render_and_load_app_config(emqx_conf), - ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:start_apps([emqx_resource]), {ok, _} = application:ensure_all_started(emqx_connector), []. end_per_suite(_Config) -> - ok = emqx_config:put([bridges], #{}), - ok = emqx_config:put_raw([bridges], #{}), - ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]), + ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]), ok = emqx_connector_test_helpers:stop_apps([emqx_resource]), _ = application:stop(emqx_connector), _ = application:stop(emqx_bridge), @@ -53,10 +54,22 @@ end_per_suite(_Config) -> suite() -> [{timetrap, {seconds, 60}}]. +init_per_testcase(t_bad_bridge_config, Config) -> + Config; +init_per_testcase(t_send_async_connection_timeout, Config) -> + ResponseDelayMS = 500, + Server = start_http_server(#{response_delay_ms => ResponseDelayMS}), + [{http_server, Server}, {response_delay_ms, ResponseDelayMS} | Config]; init_per_testcase(_TestCase, Config) -> - Config. + Server = start_http_server(#{response_delay_ms => 0}), + [{http_server, Server} | Config]. -end_per_testcase(_TestCase, _Config) -> +end_per_testcase(_TestCase, Config) -> + case ?config(http_server, Config) of + undefined -> ok; + Server -> stop_http_server(Server) + end, + emqx_bridge_testlib:delete_all_bridges(), emqx_common_test_helpers:call_janitor(), ok. @@ -65,13 +78,14 @@ end_per_testcase(_TestCase, _Config) -> %% (Orginally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> - ct:pal("Start server\n"), process_flag(trap_exit, true), Parent = self(), + ct:pal("Starting server for ~p", [Parent]), {ok, {Port, Sock}} = listen_on_random_port(), Acceptor = spawn(fun() -> accept_loop(Sock, Parent, HTTPServerConfig) end), + ct:pal("Started server on port ~p", [Port]), timer:sleep(100), #{port => Port, sock => Sock, acceptor => Acceptor}. @@ -160,8 +174,8 @@ parse_http_request_assertive(ReqStr0) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% bridge_async_config(#{port := Port} = Config) -> - Type = maps:get(type, Config, <<"webhook">>), - Name = maps:get(name, Config, atom_to_binary(?MODULE)), + Type = maps:get(type, Config, ?BRIDGE_TYPE), + Name = maps:get(name, Config, ?BRIDGE_NAME), PoolSize = maps:get(pool_size, Config, 1), QueryMode = maps:get(query_mode, Config, "async"), ConnectTimeout = maps:get(connect_timeout, Config, 1), @@ -217,8 +231,8 @@ parse_and_check(ConfigString, BridgeType, Name) -> RetConfig. make_bridge(Config) -> - Type = <<"webhook">>, - Name = atom_to_binary(?MODULE), + Type = ?BRIDGE_TYPE, + Name = ?BRIDGE_NAME, BridgeConfig = bridge_async_config(Config#{ name => Name, type => Type @@ -236,16 +250,15 @@ make_bridge(Config) -> %% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed. %% When the connection time out all the queued requests where dropped in -t_send_async_connection_timeout(_Config) -> - ResponseDelayMS = 90, - #{port := Port} = Server = start_http_server(#{response_delay_ms => 900}), - % Port = 9000, +t_send_async_connection_timeout(Config) -> + ResponseDelayMS = ?config(response_delay_ms, Config), + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, query_mode => "async", - connect_timeout => ResponseDelayMS * 2, - request_timeout => 10000, + connect_timeout => 10_000, + request_timeout => ResponseDelayMS * 2, resource_request_ttl => "infinity" }), NumberOfMessagesToSend = 10, @@ -257,11 +270,10 @@ t_send_async_connection_timeout(_Config) -> ct:pal("Sent messages\n"), MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void), receive_request_notifications(MessageIDs, ResponseDelayMS), - stop_http_server(Server), ok. -t_async_free_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_free_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -285,8 +297,8 @@ t_async_free_retries(_Config) -> do_t_async_retries(Context, {error, {shutdown, normal}}, Fn), ok. -t_async_common_retries(_Config) -> - #{port := Port} = start_http_server(#{response_delay_ms => 0}), +t_async_common_retries(Config) -> + #{port := Port} = ?config(http_server, Config), BridgeID = make_bridge(#{ port => Port, pool_size => 1, @@ -323,6 +335,28 @@ t_async_common_retries(_Config) -> do_t_async_retries(Context, {error, something_else}, FnFail), ok. +t_bad_bridge_config(_Config) -> + BridgeConfig = bridge_async_config(#{port => 12345}), + ?assertMatch( + {ok, + {{_, 201, _}, _Headers, #{ + <<"status">> := <<"disconnected">>, + <<"status_reason">> := <<"Connection refused">> + }}}, + emqx_bridge_testlib:create_bridge_api( + ?BRIDGE_TYPE, + ?BRIDGE_NAME, + BridgeConfig + ) + ), + %% try `/start` bridge + ?assertMatch( + {error, {{_, 400, _}, _Headers, #{<<"message">> := <<"Connection refused">>}}}, + emqx_bridge_testlib:op_bridge_api("start", ?BRIDGE_TYPE, ?BRIDGE_NAME) + ), + ok. + +%% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, persistent_term:put({?MODULE, ?FUNCTION_NAME, attempts}, 0), diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 149704f76..8e836aaee 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -219,10 +219,31 @@ on_start( base_path => BasePath, request => preprocess_request(maps:get(request, Config, undefined)) }, - case ehttpc_sup:start_pool(InstId, PoolOpts) of - {ok, _} -> {ok, State}; - {error, {already_started, _}} -> {ok, State}; - {error, Reason} -> {error, Reason} + case start_pool(InstId, PoolOpts) of + ok -> + case do_get_status(InstId, ConnectTimeout) of + ok -> + {ok, State}; + Error -> + ok = ehttpc_sup:stop_pool(InstId), + Error + end; + Error -> + Error + end. + +start_pool(PoolName, PoolOpts) -> + case ehttpc_sup:start_pool(PoolName, PoolOpts) of + {ok, _} -> + ok; + {error, {already_started, _}} -> + ?SLOG(warning, #{ + msg => "emqx_connector_on_start_already_started", + pool_name => PoolName + }), + ok; + Error -> + Error end. on_stop(InstId, _State) -> diff --git a/apps/emqx_connector/test/emqx_connector_http_tests.erl b/apps/emqx_connector/test/emqx_connector_http_tests.erl index 2dc2119f7..c5f6dfe78 100644 --- a/apps/emqx_connector/test/emqx_connector_http_tests.erl +++ b/apps/emqx_connector/test/emqx_connector_http_tests.erl @@ -24,6 +24,8 @@ wrap_auth_headers_test_() -> fun() -> meck:expect(ehttpc_sup, start_pool, 2, {ok, foo}), meck:expect(ehttpc, request, fun(_, _, Req, _, _) -> {ok, 200, Req} end), + meck:expect(ehttpc, workers, 1, [{self, self()}]), + meck:expect(ehttpc, health_check, 2, ok), meck:expect(ehttpc_pool, pick_worker, 1, self()), meck:expect(emqx_resource, allocate_resource, 3, ok), [ehttpc_sup, ehttpc, ehttpc_pool, emqx_resource]