test(emqx_connector): start/stop test for webhook bridge
This commit is contained in:
parent
b2a5065641
commit
0d6d441f4c
|
@ -105,19 +105,19 @@ parse_and_check(Config, ConfigString, Name) ->
|
||||||
|
|
||||||
resource_id(Config) ->
|
resource_id(Config) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
BridgeType = ?config(bridge_type, Config),
|
||||||
Name = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
emqx_bridge_resource:resource_id(BridgeType, Name).
|
emqx_bridge_resource:resource_id(BridgeType, BridgeName).
|
||||||
|
|
||||||
create_bridge(Config) ->
|
create_bridge(Config) ->
|
||||||
create_bridge(Config, _Overrides = #{}).
|
create_bridge(Config, _Overrides = #{}).
|
||||||
|
|
||||||
create_bridge(Config, Overrides) ->
|
create_bridge(Config, Overrides) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
BridgeType = ?config(bridge_type, Config),
|
||||||
Name = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
BridgeConfig0 = ?config(bridge_config, Config),
|
BridgeConfig0 = ?config(bridge_config, Config),
|
||||||
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||||
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
|
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
|
||||||
emqx_bridge:create(BridgeType, Name, BridgeConfig).
|
emqx_bridge:create(BridgeType, BridgeName, BridgeConfig).
|
||||||
|
|
||||||
create_bridge_api(Config) ->
|
create_bridge_api(Config) ->
|
||||||
create_bridge_api(Config, _Overrides = #{}).
|
create_bridge_api(Config, _Overrides = #{}).
|
||||||
|
@ -175,6 +175,8 @@ op_bridge_api(Op, BridgeType, BridgeName) ->
|
||||||
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
|
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
|
||||||
Res =
|
Res =
|
||||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
|
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
|
||||||
|
{ok, {Status = {_, 204, _}, Headers, Body}} ->
|
||||||
|
{ok, {Status, Headers, Body}};
|
||||||
{ok, {Status, Headers, Body}} ->
|
{ok, {Status, Headers, Body}} ->
|
||||||
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
|
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
|
||||||
{error, {Status, Headers, Body}} ->
|
{error, {Status, Headers, Body}} ->
|
||||||
|
@ -188,11 +190,15 @@ op_bridge_api(Op, BridgeType, BridgeName) ->
|
||||||
probe_bridge_api(Config) ->
|
probe_bridge_api(Config) ->
|
||||||
probe_bridge_api(Config, _Overrides = #{}).
|
probe_bridge_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
probe_bridge_api(Config, _Overrides) ->
|
probe_bridge_api(Config, Overrides) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
BridgeType = ?config(bridge_type, Config),
|
||||||
Name = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
BridgeConfig = ?config(bridge_config, Config),
|
BridgeConfig0 = ?config(bridge_config, Config),
|
||||||
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => Name},
|
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||||
|
probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
|
||||||
|
|
||||||
|
probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
|
||||||
|
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
||||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
Opts = #{return_all => true},
|
Opts = #{return_all => true},
|
||||||
|
@ -310,10 +316,34 @@ t_create_via_http(Config) ->
|
||||||
t_start_stop(Config, StopTracePoint) ->
|
t_start_stop(Config, StopTracePoint) ->
|
||||||
BridgeType = ?config(bridge_type, Config),
|
BridgeType = ?config(bridge_type, Config),
|
||||||
BridgeName = ?config(bridge_name, Config),
|
BridgeName = ?config(bridge_name, Config),
|
||||||
ResourceId = resource_id(Config),
|
BridgeConfig = ?config(bridge_config, Config),
|
||||||
|
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint).
|
||||||
|
|
||||||
|
t_start_stop(BridgeType, BridgeName, BridgeConfig, StopTracePoint) ->
|
||||||
|
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
begin
|
begin
|
||||||
?assertMatch({ok, _}, create_bridge(Config)),
|
%% Check that the bridge probe API doesn't leak atoms.
|
||||||
|
ProbeRes0 = probe_bridge_api(
|
||||||
|
BridgeType,
|
||||||
|
BridgeName,
|
||||||
|
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
|
||||||
|
),
|
||||||
|
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
|
||||||
|
AtomsBefore = erlang:system_info(atom_count),
|
||||||
|
%% Probe again; shouldn't have created more atoms.
|
||||||
|
ProbeRes1 = probe_bridge_api(
|
||||||
|
BridgeType,
|
||||||
|
BridgeName,
|
||||||
|
BridgeConfig#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
|
||||||
|
AtomsAfter = erlang:system_info(atom_count),
|
||||||
|
?assertEqual(AtomsBefore, AtomsAfter),
|
||||||
|
|
||||||
|
?assertMatch({ok, _}, emqx_bridge:create(BridgeType, BridgeName, BridgeConfig)),
|
||||||
|
|
||||||
%% Since the connection process is async, we give it some time to
|
%% Since the connection process is async, we give it some time to
|
||||||
%% stabilize and avoid flakiness.
|
%% stabilize and avoid flakiness.
|
||||||
?retry(
|
?retry(
|
||||||
|
@ -322,24 +352,48 @@ t_start_stop(Config, StopTracePoint) ->
|
||||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
),
|
),
|
||||||
|
|
||||||
%% Check that the bridge probe API doesn't leak atoms.
|
%% `start` bridge to trigger `already_started`
|
||||||
ProbeRes0 = probe_bridge_api(
|
?assertMatch(
|
||||||
Config,
|
{ok, {{_, 204, _}, _Headers, []}},
|
||||||
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
|
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
|
||||||
),
|
|
||||||
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
|
|
||||||
AtomsBefore = erlang:system_info(atom_count),
|
|
||||||
%% Probe again; shouldn't have created more atoms.
|
|
||||||
ProbeRes1 = probe_bridge_api(
|
|
||||||
Config,
|
|
||||||
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
|
|
||||||
),
|
),
|
||||||
|
|
||||||
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
||||||
AtomsAfter = erlang:system_info(atom_count),
|
|
||||||
?assertEqual(AtomsBefore, AtomsAfter),
|
|
||||||
|
|
||||||
%% Now stop the bridge.
|
?assertMatch(
|
||||||
|
{{ok, _}, {ok, _}},
|
||||||
|
?wait_async_action(
|
||||||
|
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName),
|
||||||
|
#{?snk_kind := StopTracePoint},
|
||||||
|
5_000
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 204, _}, _Headers, []}},
|
||||||
|
emqx_bridge_testlib:op_bridge_api("stop", BridgeType, BridgeName)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertEqual(
|
||||||
|
{error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, {{_, 204, _}, _Headers, []}},
|
||||||
|
emqx_bridge_testlib:op_bridge_api("start", BridgeType, BridgeName)
|
||||||
|
),
|
||||||
|
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
),
|
||||||
|
|
||||||
|
%% Disable the bridge, which will also stop it.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{{ok, _}, {ok, _}},
|
{{ok, _}, {ok, _}},
|
||||||
?wait_async_action(
|
?wait_async_action(
|
||||||
|
@ -352,8 +406,11 @@ t_start_stop(Config, StopTracePoint) ->
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
fun(Trace) ->
|
fun(Trace) ->
|
||||||
%% one for each probe, one for real
|
%% one for each probe, two for real
|
||||||
?assertMatch([_, _, #{instance_id := ResourceId}], ?of_kind(StopTracePoint, Trace)),
|
?assertMatch(
|
||||||
|
[_, _, #{instance_id := ResourceId}, #{instance_id := ResourceId}],
|
||||||
|
?of_kind(StopTracePoint, Trace)
|
||||||
|
),
|
||||||
ok
|
ok
|
||||||
end
|
end
|
||||||
),
|
),
|
||||||
|
|
|
@ -356,6 +356,17 @@ t_bad_bridge_config(_Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_start_stop(Config) ->
|
||||||
|
#{port := Port} = ?config(http_server, Config),
|
||||||
|
BridgeConfig = bridge_async_config(#{
|
||||||
|
type => ?BRIDGE_TYPE,
|
||||||
|
name => ?BRIDGE_NAME,
|
||||||
|
port => Port
|
||||||
|
}),
|
||||||
|
emqx_bridge_testlib:t_start_stop(
|
||||||
|
?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig, emqx_connector_http_stopped
|
||||||
|
).
|
||||||
|
|
||||||
%% helpers
|
%% helpers
|
||||||
do_t_async_retries(TestContext, Error, Fn) ->
|
do_t_async_retries(TestContext, Error, Fn) ->
|
||||||
#{error_attempts := ErrorAttempts} = TestContext,
|
#{error_attempts := ErrorAttempts} = TestContext,
|
||||||
|
|
|
@ -16,11 +16,10 @@
|
||||||
|
|
||||||
-module(emqx_connector_http).
|
-module(emqx_connector_http).
|
||||||
|
|
||||||
-include("emqx_connector.hrl").
|
|
||||||
|
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-behaviour(emqx_resource).
|
-behaviour(emqx_resource).
|
||||||
|
|
||||||
|
@ -251,7 +250,9 @@ on_stop(InstId, _State) ->
|
||||||
msg => "stopping_http_connector",
|
msg => "stopping_http_connector",
|
||||||
connector => InstId
|
connector => InstId
|
||||||
}),
|
}),
|
||||||
ehttpc_sup:stop_pool(InstId).
|
Res = ehttpc_sup:stop_pool(InstId),
|
||||||
|
?tp(emqx_connector_http_stopped, #{instance_id => InstId}),
|
||||||
|
Res.
|
||||||
|
|
||||||
on_query(InstId, {send_message, Msg}, State) ->
|
on_query(InstId, {send_message, Msg}, State) ->
|
||||||
case maps:get(request, State, undefined) of
|
case maps:get(request, State, undefined) of
|
||||||
|
|
Loading…
Reference in New Issue