test: add suite for AEH bridge v2
This commit is contained in:
parent
d0ffae56d7
commit
1dea3e1cc4
|
@ -0,0 +1,514 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_v2_testlib).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
%% ct setup helpers
|
||||
|
||||
init_per_suite(Config, Apps) ->
|
||||
[{start_apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
delete_all_bridges_and_connectors(),
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:stop_apps(lists:reverse(?config(start_apps, Config))),
|
||||
_ = application:stop(emqx_connector),
|
||||
ok.
|
||||
|
||||
init_per_group(TestGroup, BridgeType, Config) ->
|
||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
application:load(emqx_bridge),
|
||||
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||
ok = emqx_connector_test_helpers:start_apps(?config(start_apps, Config)),
|
||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
UniqueNum = integer_to_binary(erlang:unique_integer([positive])),
|
||||
MQTTTopic = <<"mqtt/topic/abc", UniqueNum/binary>>,
|
||||
[
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort},
|
||||
{mqtt_topic, MQTTTopic},
|
||||
{test_group, TestGroup},
|
||||
{bridge_type, BridgeType}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_group(Config) ->
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
% delete_all_bridges(),
|
||||
ok.
|
||||
|
||||
init_per_testcase(TestCase, Config0, BridgeConfigCb) ->
|
||||
ct:timetrap(timer:seconds(60)),
|
||||
delete_all_bridges_and_connectors(),
|
||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||
BridgeTopic =
|
||||
<<
|
||||
(atom_to_binary(TestCase))/binary,
|
||||
UniqueNum/binary
|
||||
>>,
|
||||
TestGroup = ?config(test_group, Config0),
|
||||
Config = [{bridge_topic, BridgeTopic} | Config0],
|
||||
{Name, ConfigString, BridgeConfig} = BridgeConfigCb(
|
||||
TestCase, TestGroup, Config
|
||||
),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
[
|
||||
{bridge_name, Name},
|
||||
{bridge_config_string, ConfigString},
|
||||
{bridge_config, BridgeConfig}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_testcase(_Testcase, Config) ->
|
||||
case proplists:get_bool(skip_does_not_apply, Config) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
%% in CI, apparently this needs more time since the
|
||||
%% machines struggle with all the containers running...
|
||||
emqx_common_test_helpers:call_janitor(60_000),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok
|
||||
end.
|
||||
|
||||
delete_all_bridges_and_connectors() ->
|
||||
delete_all_bridges(),
|
||||
delete_all_connectors().
|
||||
|
||||
delete_all_bridges() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
emqx_bridge_v2:remove(Type, Name)
|
||||
end,
|
||||
emqx_bridge_v2:list()
|
||||
).
|
||||
|
||||
delete_all_connectors() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
emqx_connector:remove(Type, Name)
|
||||
end,
|
||||
emqx_connector:list()
|
||||
).
|
||||
|
||||
%% test helpers
|
||||
parse_and_check(BridgeType, BridgeName, ConfigString) ->
|
||||
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||
#{<<"bridges">> := #{BridgeType := #{BridgeName := BridgeConfig}}} = RawConf,
|
||||
BridgeConfig.
|
||||
|
||||
bridge_id(Config) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||
ConnectorId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
<<"bridge_v2:", BridgeId/binary, ":", ConnectorId/binary>>.
|
||||
|
||||
resource_id(Config) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
emqx_bridge_resource:resource_id(BridgeType, BridgeName).
|
||||
|
||||
create_bridge(Config) ->
|
||||
create_bridge(Config, _Overrides = #{}).
|
||||
|
||||
create_bridge(Config, Overrides) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeConfig0 = ?config(bridge_config, Config),
|
||||
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||
ConnectorName = ?config(connector_name, Config),
|
||||
ConnectorType = ?config(connector_type, Config),
|
||||
ConnectorConfig = ?config(connector_config, Config),
|
||||
{ok, _} =
|
||||
emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
|
||||
|
||||
ct:pal("creating bridge with config: ~p", [BridgeConfig]),
|
||||
emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig).
|
||||
|
||||
create_bridge_api(Config) ->
|
||||
create_bridge_api(Config, _Overrides = #{}).
|
||||
|
||||
create_bridge_api(Config, Overrides) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeConfig0 = ?config(bridge_config, Config),
|
||||
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||
ConnectorName = ?config(connector_name, Config),
|
||||
ConnectorType = ?config(connector_type, Config),
|
||||
ConnectorConfig = ?config(connector_config, Config),
|
||||
|
||||
{ok, _Connector} =
|
||||
emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig),
|
||||
|
||||
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
ct:pal("creating bridge (via http): ~p", [Params]),
|
||||
Res =
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
|
||||
{ok, {Status, Headers, Body0}} ->
|
||||
{ok, {Status, Headers, emqx_utils_json:decode(Body0, [return_maps])}};
|
||||
Error ->
|
||||
Error
|
||||
end,
|
||||
ct:pal("bridge create result: ~p", [Res]),
|
||||
Res.
|
||||
|
||||
update_bridge_api(Config) ->
|
||||
update_bridge_api(Config, _Overrides = #{}).
|
||||
|
||||
update_bridge_api(Config, Overrides) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
Name = ?config(bridge_name, Config),
|
||||
BridgeConfig0 = ?config(bridge_config, Config),
|
||||
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
ct:pal("updating bridge (via http): ~p", [BridgeConfig]),
|
||||
Res =
|
||||
case emqx_mgmt_api_test_util:request_api(put, Path, "", AuthHeader, BridgeConfig, Opts) of
|
||||
{ok, {_Status, _Headers, Body0}} -> {ok, emqx_utils_json:decode(Body0, [return_maps])};
|
||||
Error -> Error
|
||||
end,
|
||||
ct:pal("bridge update result: ~p", [Res]),
|
||||
Res.
|
||||
|
||||
op_bridge_api(Op, BridgeType, BridgeName) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, Op]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
ct:pal("calling bridge ~p (via http): ~p", [BridgeId, Op]),
|
||||
Res =
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, "", Opts) of
|
||||
{ok, {Status = {_, 204, _}, Headers, Body}} ->
|
||||
{ok, {Status, Headers, Body}};
|
||||
{ok, {Status, Headers, Body}} ->
|
||||
{ok, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
|
||||
{error, {Status, Headers, Body}} ->
|
||||
{error, {Status, Headers, emqx_utils_json:decode(Body, [return_maps])}};
|
||||
Error ->
|
||||
Error
|
||||
end,
|
||||
ct:pal("bridge op result: ~p", [Res]),
|
||||
Res.
|
||||
|
||||
probe_bridge_api(Config) ->
|
||||
probe_bridge_api(Config, _Overrides = #{}).
|
||||
|
||||
probe_bridge_api(Config, Overrides) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeConfig0 = ?config(bridge_config, Config),
|
||||
BridgeConfig = emqx_utils_maps:deep_merge(BridgeConfig0, Overrides),
|
||||
probe_bridge_api(BridgeType, BridgeName, BridgeConfig).
|
||||
|
||||
probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
|
||||
Params = BridgeConfig#{<<"type">> => BridgeType, <<"name">> => BridgeName},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2_probe"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
ct:pal("probing bridge (via http): ~p", [Params]),
|
||||
Res =
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params, Opts) of
|
||||
{ok, {{_, 204, _}, _Headers, _Body0} = Res0} -> {ok, Res0};
|
||||
Error -> Error
|
||||
end,
|
||||
ct:pal("bridge probe result: ~p", [Res]),
|
||||
Res.
|
||||
|
||||
try_decode_error(Body0) ->
|
||||
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
|
||||
{ok, #{<<"message">> := Msg0} = Body1} ->
|
||||
case emqx_utils_json:safe_decode(Msg0, [return_maps]) of
|
||||
{ok, Msg1} -> Body1#{<<"message">> := Msg1};
|
||||
{error, _} -> Body1
|
||||
end;
|
||||
{ok, Body1} ->
|
||||
Body1;
|
||||
{error, _} ->
|
||||
Body0
|
||||
end.
|
||||
|
||||
create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
|
||||
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
|
||||
|
||||
create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
|
||||
SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
|
||||
Params = #{
|
||||
enable => true,
|
||||
sql => SQL,
|
||||
actions => [BridgeId]
|
||||
},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["rules"]),
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
ct:pal("rule action params: ~p", [Params]),
|
||||
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
|
||||
{ok, Res0} ->
|
||||
Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
|
||||
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
|
||||
{ok, Res};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_sync_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch({ok, _}, create_bridge_api(Config)),
|
||||
ResourceId = resource_id(Config),
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
BridgeId = bridge_id(Config),
|
||||
Message = {BridgeId, MakeMessageFun()},
|
||||
IsSuccessCheck(emqx_resource:simple_sync_query(ResourceId, Message)),
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
ResourceId = resource_id(Config),
|
||||
?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
|
||||
ReplyFun =
|
||||
fun(Pid, Result) ->
|
||||
Pid ! {result, Result}
|
||||
end,
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch({ok, _}, create_bridge_api(Config)),
|
||||
ResourceId = resource_id(Config),
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
BridgeId = bridge_id(Config),
|
||||
Message = {BridgeId, MakeMessageFun()},
|
||||
?assertMatch(
|
||||
{ok, {ok, _}},
|
||||
?wait_async_action(
|
||||
emqx_resource:query(ResourceId, Message, #{
|
||||
async_reply_fun => {ReplyFun, [self()]}
|
||||
}),
|
||||
#{?snk_kind := TracePoint, instance_id := ResourceId},
|
||||
5_000
|
||||
)
|
||||
),
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
ResourceId = resource_id(Config),
|
||||
?assertMatch([#{instance_id := ResourceId}], ?of_kind(TracePoint, Trace))
|
||||
end
|
||||
),
|
||||
receive
|
||||
{result, Result} -> IsSuccessCheck(Result)
|
||||
after 5_000 ->
|
||||
throw(timeout)
|
||||
end,
|
||||
ok.
|
||||
|
||||
t_create_via_http(Config) ->
|
||||
?check_trace(
|
||||
begin
|
||||
?assertMatch({ok, _}, create_bridge_api(Config)),
|
||||
|
||||
%% lightweight matrix testing some configs
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
update_bridge_api(
|
||||
Config
|
||||
)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
update_bridge_api(
|
||||
Config
|
||||
)
|
||||
),
|
||||
ok
|
||||
end,
|
||||
[]
|
||||
),
|
||||
ok.
|
||||
|
||||
t_start_stop(Config, StopTracePoint) ->
|
||||
BridgeType = ?config(bridge_type, Config),
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
BridgeConfig = ?config(bridge_config, Config),
|
||||
ConnectorName = ?config(connector_name, Config),
|
||||
ConnectorType = ?config(connector_type, Config),
|
||||
ConnectorConfig = ?config(connector_config, Config),
|
||||
|
||||
?assertMatch(
|
||||
{ok, _},
|
||||
emqx_connector:create(ConnectorType, ConnectorName, ConnectorConfig)
|
||||
),
|
||||
|
||||
?check_trace(
|
||||
begin
|
||||
ProbeRes0 = probe_bridge_api(
|
||||
BridgeType,
|
||||
BridgeName,
|
||||
BridgeConfig
|
||||
),
|
||||
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0),
|
||||
%% Check that the bridge probe API doesn't leak atoms.
|
||||
AtomsBefore = erlang:system_info(atom_count),
|
||||
%% Probe again; shouldn't have created more atoms.
|
||||
ProbeRes1 = probe_bridge_api(
|
||||
BridgeType,
|
||||
BridgeName,
|
||||
BridgeConfig
|
||||
),
|
||||
|
||||
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes1),
|
||||
AtomsAfter = erlang:system_info(atom_count),
|
||||
?assertEqual(AtomsBefore, AtomsAfter),
|
||||
|
||||
?assertMatch({ok, _}, emqx_bridge_v2:create(BridgeType, BridgeName, BridgeConfig)),
|
||||
|
||||
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
|
||||
%% Since the connection process is async, we give it some time to
|
||||
%% stabilize and avoid flakiness.
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
|
||||
%% `start` bridge to trigger `already_started`
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _Headers, []}},
|
||||
emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName)
|
||||
),
|
||||
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
||||
|
||||
%% Not supported anymore
|
||||
|
||||
%% ?assertMatch(
|
||||
%% {{ok, _}, {ok, _}},
|
||||
%% ?wait_async_action(
|
||||
%% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName),
|
||||
%% #{?snk_kind := StopTracePoint},
|
||||
%% 5_000
|
||||
%% )
|
||||
%% ),
|
||||
|
||||
%% ?assertEqual(
|
||||
%% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
|
||||
%% ),
|
||||
|
||||
%% ?assertMatch(
|
||||
%% {ok, {{_, 204, _}, _Headers, []}},
|
||||
%% emqx_bridge_v2_testlib:op_bridge_api("stop", BridgeType, BridgeName)
|
||||
%% ),
|
||||
|
||||
%% ?assertEqual(
|
||||
%% {error, resource_is_stopped}, emqx_resource_manager:health_check(ResourceId)
|
||||
%% ),
|
||||
|
||||
%% ?assertMatch(
|
||||
%% {ok, {{_, 204, _}, _Headers, []}},
|
||||
%% emqx_bridge_v2_testlib:op_bridge_api("start", BridgeType, BridgeName)
|
||||
%% ),
|
||||
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
|
||||
%% Disable the connector, which will also stop it.
|
||||
?assertMatch(
|
||||
{{ok, _}, {ok, _}},
|
||||
?wait_async_action(
|
||||
emqx_connector:disable_enable(disable, ConnectorType, ConnectorName),
|
||||
#{?snk_kind := StopTracePoint},
|
||||
5_000
|
||||
)
|
||||
),
|
||||
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
ResourceId = emqx_bridge_resource:resource_id(BridgeType, BridgeName),
|
||||
%% one for each probe, one for real
|
||||
?assertMatch(
|
||||
[_, _, #{instance_id := ResourceId}],
|
||||
?of_kind(StopTracePoint, Trace)
|
||||
),
|
||||
ok
|
||||
end
|
||||
),
|
||||
ok.
|
||||
|
||||
t_on_get_status(Config) ->
|
||||
t_on_get_status(Config, _Opts = #{}).
|
||||
|
||||
t_on_get_status(Config, Opts) ->
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyName = ?config(proxy_name, Config),
|
||||
FailureStatus = maps:get(failure_status, Opts, disconnected),
|
||||
?assertMatch({ok, _}, create_bridge(Config)),
|
||||
ResourceId = resource_id(Config),
|
||||
%% Since the connection process is async, we give it some time to
|
||||
%% stabilize and avoid flakiness.
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||
ct:sleep(500),
|
||||
?retry(
|
||||
_Interval0 = 200,
|
||||
_Attempts0 = 10,
|
||||
?assertEqual({ok, FailureStatus}, emqx_resource_manager:health_check(ResourceId))
|
||||
)
|
||||
end),
|
||||
%% Check that it recovers itself.
|
||||
?retry(
|
||||
_Sleep = 1_000,
|
||||
_Attempts = 20,
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||
),
|
||||
ok.
|
|
@ -0,0 +1,341 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_azure_event_hub_v2_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
-define(BRIDGE_TYPE, azure_event_hub).
|
||||
-define(BRIDGE_TYPE_BIN, <<"azure_event_hub">>).
|
||||
-define(KAFKA_BRIDGE_TYPE, kafka).
|
||||
-define(APPS, [emqx_resource, emqx_connector, emqx_bridge, emqx_rule_engine]).
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
KafkaHost = os:getenv("KAFKA_SASL_SSL_HOST", "toxiproxy.emqx.net"),
|
||||
KafkaPort = list_to_integer(os:getenv("KAFKA_SASL_SSL_PORT", "9295")),
|
||||
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||
ProxyName = "kafka_sasl_ssl",
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
case emqx_common_test_helpers:is_tcp_server_available(KafkaHost, KafkaPort) of
|
||||
true ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
[
|
||||
emqx_conf,
|
||||
emqx,
|
||||
emqx_management,
|
||||
emqx_resource,
|
||||
emqx_bridge_azure_event_hub,
|
||||
emqx_bridge,
|
||||
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||
],
|
||||
#{work_dir => ?config(priv_dir, Config)}
|
||||
),
|
||||
{ok, Api} = emqx_common_test_http:create_default_app(),
|
||||
[
|
||||
{tc_apps, Apps},
|
||||
{api, Api},
|
||||
{proxy_name, ProxyName},
|
||||
{proxy_host, ProxyHost},
|
||||
{proxy_port, ProxyPort},
|
||||
{kafka_host, KafkaHost},
|
||||
{kafka_port, KafkaPort}
|
||||
| Config
|
||||
];
|
||||
false ->
|
||||
case os:getenv("IS_CI") of
|
||||
"yes" ->
|
||||
throw(no_kafka);
|
||||
_ ->
|
||||
{skip, no_kafka}
|
||||
end
|
||||
end.
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(tc_apps, Config),
|
||||
emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
common_init_per_testcase(TestCase, Config).
|
||||
|
||||
common_init_per_testcase(TestCase, Config) ->
|
||||
ct:timetrap(timer:seconds(60)),
|
||||
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||
emqx_config:delete_override_conf_files(),
|
||||
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
||||
Name = iolist_to_binary([atom_to_binary(TestCase), UniqueNum]),
|
||||
KafkaHost = ?config(kafka_host, Config),
|
||||
KafkaPort = ?config(kafka_port, Config),
|
||||
KafkaTopic = Name,
|
||||
ConnectorConfig = connector_config(Name, KafkaHost, KafkaPort),
|
||||
{BridgeConfig, ExtraConfig} = bridge_config(Name, Name, KafkaTopic),
|
||||
ensure_topic(Config, KafkaTopic, _Opts = #{}),
|
||||
ok = snabbkaffe:start_trace(),
|
||||
ExtraConfig ++
|
||||
[
|
||||
{connector_type, ?BRIDGE_TYPE},
|
||||
{connector_name, Name},
|
||||
{connector_config, ConnectorConfig},
|
||||
{bridge_type, ?BRIDGE_TYPE},
|
||||
{bridge_name, Name},
|
||||
{bridge_config, BridgeConfig}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_testcase(_Testcase, Config) ->
|
||||
case proplists:get_bool(skip_does_not_apply, Config) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
ProxyHost = ?config(proxy_host, Config),
|
||||
ProxyPort = ?config(proxy_port, Config),
|
||||
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
||||
emqx_common_test_helpers:call_janitor(60_000),
|
||||
ok = snabbkaffe:stop(),
|
||||
ok
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
connector_config(Name, KafkaHost, KafkaPort) ->
|
||||
InnerConfigMap0 =
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"bootstrap_hosts">> => iolist_to_binary([KafkaHost, ":", integer_to_binary(KafkaPort)]),
|
||||
<<"authentication">> =>
|
||||
#{
|
||||
<<"mechanism">> => <<"plain">>,
|
||||
<<"username">> => <<"emqxuser">>,
|
||||
<<"password">> => <<"password">>
|
||||
},
|
||||
<<"connect_timeout">> => <<"5s">>,
|
||||
<<"socket_opts">> =>
|
||||
#{
|
||||
<<"nodelay">> => true,
|
||||
<<"recbuf">> => <<"1024KB">>,
|
||||
<<"sndbuf">> => <<"1024KB">>,
|
||||
<<"tcp_keepalive">> => <<"none">>
|
||||
},
|
||||
<<"ssl">> =>
|
||||
#{
|
||||
<<"cacertfile">> => shared_secret(client_cacertfile),
|
||||
<<"certfile">> => shared_secret(client_certfile),
|
||||
<<"keyfile">> => shared_secret(client_keyfile),
|
||||
<<"ciphers">> => [],
|
||||
<<"depth">> => 10,
|
||||
<<"enable">> => true,
|
||||
<<"hibernate_after">> => <<"5s">>,
|
||||
<<"log_level">> => <<"notice">>,
|
||||
<<"reuse_sessions">> => true,
|
||||
<<"secure_renegotiate">> => true,
|
||||
<<"server_name_indication">> => <<"disable">>,
|
||||
%% currently, it seems our CI kafka certs fail peer verification
|
||||
<<"verify">> => <<"verify_none">>,
|
||||
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
|
||||
}
|
||||
},
|
||||
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
|
||||
parse_and_check_connector_config(InnerConfigMap, Name).
|
||||
|
||||
parse_and_check_connector_config(InnerConfigMap, Name) ->
|
||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||
RawConf = #{<<"connectors">> => #{TypeBin => #{Name => InnerConfigMap}}},
|
||||
#{<<"connectors">> := #{TypeBin := #{Name := Config}}} =
|
||||
hocon_tconf:check_plain(emqx_connector_schema, RawConf, #{
|
||||
required => false, atom_key => false
|
||||
}),
|
||||
ct:pal("parsed config: ~p", [Config]),
|
||||
InnerConfigMap.
|
||||
|
||||
bridge_config(Name, ConnectorId, KafkaTopic) ->
|
||||
InnerConfigMap0 =
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"connector">> => ConnectorId,
|
||||
<<"kafka">> =>
|
||||
#{
|
||||
<<"buffer">> =>
|
||||
#{
|
||||
<<"memory_overload_protection">> => true,
|
||||
<<"mode">> => <<"memory">>,
|
||||
<<"per_partition_limit">> => <<"2GB">>,
|
||||
<<"segment_bytes">> => <<"100MB">>
|
||||
},
|
||||
<<"compression">> => <<"no_compression">>,
|
||||
<<"kafka_header_value_encode_mode">> => <<"none">>,
|
||||
<<"max_batch_bytes">> => <<"896KB">>,
|
||||
<<"max_inflight">> => <<"10">>,
|
||||
<<"message">> =>
|
||||
#{
|
||||
<<"key">> => <<"${.clientid}">>,
|
||||
<<"value">> => <<"${.}">>
|
||||
},
|
||||
<<"partition_count_refresh_interval">> => <<"60s">>,
|
||||
<<"partition_strategy">> => <<"random">>,
|
||||
<<"query_mode">> => <<"async">>,
|
||||
<<"required_acks">> => <<"all_isr">>,
|
||||
<<"sync_query_timeout">> => <<"5s">>,
|
||||
<<"topic">> => KafkaTopic
|
||||
},
|
||||
<<"local_topic">> => <<"t/aeh">>
|
||||
%%,
|
||||
},
|
||||
InnerConfigMap = serde_roundtrip(InnerConfigMap0),
|
||||
ExtraConfig =
|
||||
[{kafka_topic, KafkaTopic}],
|
||||
{parse_and_check_bridge_config(InnerConfigMap, Name), ExtraConfig}.
|
||||
|
||||
%% check it serializes correctly
|
||||
serde_roundtrip(InnerConfigMap0) ->
|
||||
IOList = hocon_pp:do(InnerConfigMap0, #{}),
|
||||
{ok, InnerConfigMap} = hocon:binary(IOList),
|
||||
InnerConfigMap.
|
||||
|
||||
parse_and_check_bridge_config(InnerConfigMap, Name) ->
|
||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||
RawConf = #{<<"bridges">> => #{TypeBin => #{Name => InnerConfigMap}}},
|
||||
hocon_tconf:check_plain(emqx_bridge_v2_schema, RawConf, #{required => false, atom_key => false}),
|
||||
InnerConfigMap.
|
||||
|
||||
shared_secret_path() ->
|
||||
os:getenv("CI_SHARED_SECRET_PATH", "/var/lib/secret").
|
||||
|
||||
shared_secret(client_keyfile) ->
|
||||
filename:join([shared_secret_path(), "client.key"]);
|
||||
shared_secret(client_certfile) ->
|
||||
filename:join([shared_secret_path(), "client.crt"]);
|
||||
shared_secret(client_cacertfile) ->
|
||||
filename:join([shared_secret_path(), "ca.crt"]);
|
||||
shared_secret(rig_keytab) ->
|
||||
filename:join([shared_secret_path(), "rig.keytab"]).
|
||||
|
||||
ensure_topic(Config, KafkaTopic, Opts) ->
|
||||
KafkaHost = ?config(kafka_host, Config),
|
||||
KafkaPort = ?config(kafka_port, Config),
|
||||
NumPartitions = maps:get(num_partitions, Opts, 3),
|
||||
Endpoints = [{KafkaHost, KafkaPort}],
|
||||
TopicConfigs = [
|
||||
#{
|
||||
name => KafkaTopic,
|
||||
num_partitions => NumPartitions,
|
||||
replication_factor => 1,
|
||||
assignments => [],
|
||||
configs => []
|
||||
}
|
||||
],
|
||||
RequestConfig = #{timeout => 5_000},
|
||||
ConnConfig =
|
||||
#{
|
||||
ssl => emqx_tls_lib:to_client_opts(
|
||||
#{
|
||||
keyfile => shared_secret(client_keyfile),
|
||||
certfile => shared_secret(client_certfile),
|
||||
cacertfile => shared_secret(client_cacertfile),
|
||||
verify => verify_none,
|
||||
enable => true
|
||||
}
|
||||
),
|
||||
sasl => {plain, <<"emqxuser">>, <<"password">>}
|
||||
},
|
||||
case brod:create_topics(Endpoints, TopicConfigs, RequestConfig, ConnConfig) of
|
||||
ok -> ok;
|
||||
{error, topic_already_exists} -> ok
|
||||
end.
|
||||
|
||||
make_message() ->
|
||||
Time = erlang:unique_integer(),
|
||||
BinTime = integer_to_binary(Time),
|
||||
Payload = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||
#{
|
||||
clientid => BinTime,
|
||||
payload => Payload,
|
||||
timestamp => Time
|
||||
}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_start_stop(Config) ->
|
||||
emqx_bridge_v2_testlib:t_start_stop(Config, kafka_producer_stopped),
|
||||
ok.
|
||||
|
||||
t_create_via_http(Config) ->
|
||||
emqx_bridge_v2_testlib:t_create_via_http(Config),
|
||||
ok.
|
||||
|
||||
t_on_get_status(Config) ->
|
||||
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
|
||||
ok.
|
||||
|
||||
t_sync_query(Config) ->
|
||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||
Config,
|
||||
fun make_message/0,
|
||||
fun(Res) -> ?assertEqual(ok, Res) end,
|
||||
emqx_bridge_kafka_impl_producer_sync_query
|
||||
),
|
||||
ok.
|
||||
|
||||
t_same_name_azure_kafka_bridges(Config) ->
|
||||
BridgeName = ?config(bridge_name, Config),
|
||||
TracePoint = emqx_bridge_kafka_impl_producer_sync_query,
|
||||
%% creates the AEH bridge and check it's working
|
||||
ok = emqx_bridge_v2_testlib:t_sync_query(
|
||||
Config,
|
||||
fun make_message/0,
|
||||
fun(Res) -> ?assertEqual(ok, Res) end,
|
||||
TracePoint
|
||||
),
|
||||
|
||||
%% then create a Kafka bridge with same name and delete it after creation
|
||||
ConfigKafka0 = lists:keyreplace(bridge_type, 1, Config, {bridge_type, ?KAFKA_BRIDGE_TYPE}),
|
||||
ConfigKafka = lists:keyreplace(
|
||||
connector_type, 1, ConfigKafka0, {connector_type, ?KAFKA_BRIDGE_TYPE}
|
||||
),
|
||||
ok = emqx_bridge_v2_testlib:t_create_via_http(ConfigKafka),
|
||||
|
||||
AehResourceId = emqx_bridge_v2_testlib:resource_id(Config),
|
||||
KafkaResourceId = emqx_bridge_v2_testlib:resource_id(ConfigKafka),
|
||||
%% check that both bridges are healthy
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(AehResourceId)),
|
||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(KafkaResourceId)),
|
||||
?assertMatch(
|
||||
{{ok, _}, {ok, _}},
|
||||
?wait_async_action(
|
||||
emqx_connector:disable_enable(disable, ?KAFKA_BRIDGE_TYPE, BridgeName),
|
||||
#{?snk_kind := kafka_producer_stopped},
|
||||
5_000
|
||||
)
|
||||
),
|
||||
% check that AEH bridge is still working
|
||||
?check_trace(
|
||||
begin
|
||||
BridgeId = emqx_bridge_v2_testlib:bridge_id(Config),
|
||||
Message = {BridgeId, make_message()},
|
||||
?assertEqual(ok, emqx_resource:simple_sync_query(AehResourceId, Message)),
|
||||
ok
|
||||
end,
|
||||
fun(Trace) ->
|
||||
?assertMatch([#{instance_id := AehResourceId}], ?of_kind(TracePoint, Trace))
|
||||
end
|
||||
),
|
||||
ok.
|
|
@ -213,6 +213,7 @@ on_stop(InstanceId, _State) ->
|
|||
end,
|
||||
AllocatedResources
|
||||
),
|
||||
?tp(kafka_producer_stopped, #{instance_id => InstanceId}),
|
||||
ok.
|
||||
|
||||
deallocate_client(ClientId) ->
|
||||
|
|
Loading…
Reference in New Issue