From 0d6d441f4ccd6181374711032ca01a119627a979 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Wed, 14 Jun 2023 09:56:50 +0200 Subject: [PATCH] test(emqx_connector): start/stop test for webhook bridge --- apps/emqx_bridge/test/emqx_bridge_testlib.erl | 111 +++++++++++++----- .../test/emqx_bridge_webhook_SUITE.erl | 11 ++ .../src/emqx_connector_http.erl | 7 +- 3 files changed, 99 insertions(+), 30 deletions(-) diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index d5fddaea8..62ba70b33 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -105,19 +105,19 @@ parse_and_check(Config, ConfigString, Name) -> resource_id(Config) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - emqx_bridge_resource:resource_id(BridgeType, Name). + BridgeName = ?config(bridge_name, Config), + emqx_bridge_resource:resource_id(BridgeType, BridgeName). create_bridge(Config) -> create_bridge(Config, _Overrides = #{}). create_bridge(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), + BridgeName = ?config(bridge_name, Config), BridgeConfig0 = ?config(bridge_config, Config), BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), ct:pal("creating bridge with config: ~p", [BridgeConfig]), - emqx_bridge:create(BridgeType, Name, BridgeConfig). + emqx_bridge:create(BridgeType, BridgeName, BridgeConfig). create_bridge_api(Config) -> create_bridge_api(Config, _Overrides = #{}). @@ -175,6 +175,8 @@ op_bridge_api(Op, BridgeType, BridgeName) -> ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]), Res = case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of + {ok, {Status = {_, 204, _}, Headers, Body}} -> + {ok, {Status, Headers, Body}}; {ok, {Status, Headers, Body}} -> {ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}}; {error, {Status, Headers, Body}} -> @@ -188,11 +190,15 @@ op_bridge_api(Op, BridgeType, BridgeName) -> probe_bridge_api(Config) -> probe_bridge_api(Config, _Overrides = #{}). -probe_bridge_api(Config, _Overrides) -> +probe_bridge_api(Config, Overrides) -> BridgeType = ?config(bridge_type, Config), - Name = ?config(bridge_name, Config), - BridgeConfig = ?config(bridge_config, Config), - Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name}, + BridgeName = ?config(bridge_name, Config), + BridgeConfig0 = ?config(bridge_config, Config), + BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides), + probe_bridge_api(BridgeType, BridgeName, BridgeConfig). + +probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> + Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName}, Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), Opts = #{return_all => true}, @@ -310,10 +316,34 @@ t_create_via_http(Config) -> t_start_stop(Config, StopTracePoint) -> BridgeType = ?config(bridge_type, Config), BridgeName = ?config(bridge_name, Config), - ResourceId = resource_id(Config), + BridgeConfig = ?config(bridge_config, Config), + t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint). + +t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) -> + ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName), ?check_trace( begin - ?assertMatch({ok, _}, create_bridge(Config)), + %% Check that the bridge probe API doesn't leak atoms. + ProbeRes0 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), + AtomsBefore = erlang:system_info(atom_count), + %% Probe again; shouldn't have created more atoms. + ProbeRes1 = probe_bridge_api( + BridgeType, + BridgeName, + BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + ), + + ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), + AtomsAfter = erlang:system_info(atom_count), + ?assertEqual(AtomsBefore, AtomsAfter), + + ?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)), + %% Since the connection process is async, we give it some time to %% stabilize and avoid flakiness. ?retry( @@ -322,24 +352,48 @@ t_start_stop(Config, StopTracePoint) -> ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) ), - %% Check that the bridge probe API doesn't leak atoms. - ProbeRes0 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} - ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0), - AtomsBefore = erlang:system_info(atom_count), - %% Probe again; shouldn't have created more atoms. - ProbeRes1 = probe_bridge_api( - Config, - #{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}} + %% `start` bridge to trigger `already_started` + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) ), - ?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1), - AtomsAfter = erlang:system_info(atom_count), - ?assertEqual(AtomsBefore, AtomsAfter), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), - %% Now stop the bridge. + ?assertMatch( + {{ok, _}, {ok, _}}, + ?wait_async_action( + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName), + #{?snk_kind := StopTracePoint}, + 5_000 + ) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName) + ), + + ?assertEqual( + {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId) + ), + + ?assertMatch( + {ok, {{_, 204, _}, _Headers, []}}, + emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName) + ), + + ?retry( + _Sleep = 1_000, + _Attempts = 20, + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) + ), + + %% Disable the bridge, which will also stop it. ?assertMatch( {{ok, _}, {ok, _}}, ?wait_async_action( @@ -352,8 +406,11 @@ t_start_stop(Config, StopTracePoint) -> ok end, fun(Trace) -> - %% one for each probe, one for real - ?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)), + %% one for each probe, two for real + ?assertMatch( + [_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}], + ?of_kind(StopTracePoint, Trace) + ), ok end ), diff --git a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl index a1ff465c9..93eab438e 100644 --- a/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_webhook_SUITE.erl @@ -356,6 +356,17 @@ t_bad_bridge_config(_Config) -> ), ok. +t_start_stop(Config) -> + #{port := Port} = ?config(http_server, Config), + BridgeConfig = bridge_async_config(#{ + type => ?BRIDGE_TYPE, + name => ?BRIDGE_NAME, + port => Port + }), + emqx_bridge_testlib:t_start_stop( + ?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped + ). + %% helpers do_t_async_retries(TestContext, Error, Fn) -> #{error_attempts := ErrorAttempts} = TestContext, diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index 8e836aaee..ce8a1a1a5 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -16,11 +16,10 @@ -module(emqx_connector_http). --include("emqx_connector.hrl"). - -include_lib("typerefl/include/types.hrl"). -include_lib("hocon/include/hoconsc.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -behaviour(emqx_resource). @@ -251,7 +250,9 @@ on_stop(InstId, _State) -> msg => "stopping_http_connector", connector => InstId }), - ehttpc_sup:stop_pool(InstId). + Res = ehttpc_sup:stop_pool(InstId), + ?tp(emqx_connector_http_stopped, #{instance_id => InstId}), + Res. on_query(InstId, {send_message, Msg}, State) -> case maps:get(request, State, undefined) of