Merge pull request #11853 from thalesmg/test-bridge-v1-compat-layer-r53-20231030
test(bridges): add bridge v1 compatibility layer test suite, and other fixes
This commit is contained in:
commit
aa8a6f2e26
|
@ -703,7 +703,7 @@ atom(Bin) when is_binary(Bin), size(Bin) > 255 ->
|
|||
erlang:throw(
|
||||
iolist_to_binary(
|
||||
io_lib:format(
|
||||
"Name is is too long."
|
||||
"Name is too long."
|
||||
" Please provide a shorter name (<= 255 bytes)."
|
||||
" The name that is too long: \"~s\"",
|
||||
[Bin]
|
||||
|
|
|
@ -387,6 +387,7 @@ schema("/bridges/:id/enable/:enable") ->
|
|||
responses =>
|
||||
#{
|
||||
204 => <<"Success">>,
|
||||
400 => error_schema('BAD_REQUEST', non_compat_bridge_msg()),
|
||||
404 => error_schema('NOT_FOUND', "Bridge not found or invalid operation"),
|
||||
503 => error_schema('SERVICE_UNAVAILABLE', "Service unavailable")
|
||||
}
|
||||
|
@ -507,7 +508,7 @@ schema("/bridges_probe") ->
|
|||
case maps:get(<<"also_delete_dep_actions">>, Qs, <<"false">>) of
|
||||
<<"true">> -> [rule_actions, connector];
|
||||
true -> [rule_actions, connector];
|
||||
_ -> []
|
||||
_ -> [connector]
|
||||
end,
|
||||
case emqx_bridge:check_deps_and_remove(BridgeType, BridgeName, AlsoDelete) of
|
||||
ok ->
|
||||
|
@ -667,6 +668,10 @@ get_metrics_from_local_node(BridgeType0, BridgeName) ->
|
|||
?SERVICE_UNAVAILABLE(<<"request timeout">>);
|
||||
{error, timeout} ->
|
||||
?SERVICE_UNAVAILABLE(<<"request timeout">>);
|
||||
{error, not_bridge_v1_compatible} ->
|
||||
?BAD_REQUEST(non_compat_bridge_msg());
|
||||
{error, bridge_not_found} ->
|
||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||
{error, Reason} ->
|
||||
?INTERNAL_ERROR(Reason)
|
||||
end
|
||||
|
|
|
@ -939,7 +939,7 @@ unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
|
|||
%% Check if the bridge can be converted to a valid bridge v1
|
||||
%%
|
||||
%% * The corresponding bridge v2 should exist
|
||||
%% * The connector for the bridge v2 should have exactly on channel
|
||||
%% * The connector for the bridge v2 should have exactly one channel
|
||||
is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
|
||||
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||
case lookup_conf(BridgeV2Type, BridgeName) of
|
||||
|
@ -986,7 +986,7 @@ list_and_transform_to_bridge_v1() ->
|
|||
[B || B <- Bridges, B =/= not_bridge_v1_compatible_error()].
|
||||
|
||||
lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
|
||||
case is_valid_bridge_v1(BridgeV1Type, Name) of
|
||||
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, Name) of
|
||||
true ->
|
||||
Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||
case lookup(Type, Name) of
|
||||
|
@ -1068,21 +1068,29 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
|
|||
case lookup_conf(BridgeV2Type, BridgeName) of
|
||||
{error, _} ->
|
||||
%% If the bridge v2 does not exist, it is a valid bridge v1
|
||||
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
|
||||
PreviousRawConf = undefined,
|
||||
split_bridge_v1_config_and_create_helper(
|
||||
BridgeV1Type, BridgeName, RawConf, PreviousRawConf
|
||||
);
|
||||
_Conf ->
|
||||
case is_valid_bridge_v1(BridgeV1Type, BridgeName) of
|
||||
case ?MODULE:is_valid_bridge_v1(BridgeV1Type, BridgeName) of
|
||||
true ->
|
||||
%% Using remove + create as update, hence do not delete deps.
|
||||
RemoveDeps = [],
|
||||
PreviousRawConf = emqx:get_raw_config(
|
||||
[?ROOT_KEY, BridgeV2Type, BridgeName], undefined
|
||||
),
|
||||
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps),
|
||||
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
|
||||
split_bridge_v1_config_and_create_helper(
|
||||
BridgeV1Type, BridgeName, RawConf, PreviousRawConf
|
||||
);
|
||||
false ->
|
||||
%% If the bridge v2 exists, it is not a valid bridge v1
|
||||
{error, non_compatible_bridge_v2_exists}
|
||||
end
|
||||
end.
|
||||
|
||||
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
|
||||
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
|
||||
#{
|
||||
connector_type := ConnectorType,
|
||||
connector_name := NewConnectorName,
|
||||
|
@ -1091,16 +1099,14 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
|
|||
bridge_v2_name := BridgeName,
|
||||
bridge_v2_conf := NewBridgeV2RawConf
|
||||
} =
|
||||
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf),
|
||||
%% TODO should we really create an atom here?
|
||||
ConnectorNameAtom = binary_to_atom(NewConnectorName),
|
||||
case emqx_connector:create(ConnectorType, ConnectorNameAtom, NewConnectorRawConf) of
|
||||
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf),
|
||||
case emqx_connector:create(ConnectorType, NewConnectorName, NewConnectorRawConf) of
|
||||
{ok, _} ->
|
||||
case create(BridgeType, BridgeName, NewBridgeV2RawConf) of
|
||||
{ok, _} = Result ->
|
||||
Result;
|
||||
{error, Reason1} ->
|
||||
case emqx_connector:remove(ConnectorType, ConnectorNameAtom) of
|
||||
case emqx_connector:remove(ConnectorType, NewConnectorName) of
|
||||
ok ->
|
||||
{error, Reason1};
|
||||
{error, Reason2} ->
|
||||
|
@ -1118,14 +1124,14 @@ split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
|
|||
Error
|
||||
end.
|
||||
|
||||
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
||||
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf, PreviousRawConf) ->
|
||||
%% Create fake global config for the transformation and then call
|
||||
%% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1
|
||||
%% `emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1'
|
||||
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||
ConnectorType = connector_type(BridgeV2Type),
|
||||
%% Needed so name confligts will ba avoided
|
||||
%% Needed to avoid name conflicts
|
||||
CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
|
||||
FakeGlobalConfig = #{
|
||||
FakeGlobalConfig0 = #{
|
||||
<<"connectors">> => CurrentConnectorsConfig,
|
||||
<<"bridges">> => #{
|
||||
bin(BridgeV1Type) => #{
|
||||
|
@ -1133,6 +1139,13 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
|||
}
|
||||
}
|
||||
},
|
||||
FakeGlobalConfig =
|
||||
emqx_utils_maps:put_if(
|
||||
FakeGlobalConfig0,
|
||||
bin(?ROOT_KEY),
|
||||
#{bin(BridgeV2Type) => #{bin(BridgeName) => PreviousRawConf}},
|
||||
PreviousRawConf =/= undefined
|
||||
),
|
||||
Output = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(
|
||||
FakeGlobalConfig
|
||||
),
|
||||
|
@ -1145,34 +1158,21 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
|||
],
|
||||
Output
|
||||
),
|
||||
ConnectorsBefore =
|
||||
maps:keys(
|
||||
emqx_utils_maps:deep_get(
|
||||
[
|
||||
<<"connectors">>,
|
||||
bin(ConnectorType)
|
||||
],
|
||||
FakeGlobalConfig,
|
||||
#{}
|
||||
)
|
||||
),
|
||||
ConnectorsAfter =
|
||||
maps:keys(
|
||||
emqx_utils_maps:deep_get(
|
||||
[
|
||||
<<"connectors">>,
|
||||
bin(ConnectorType)
|
||||
],
|
||||
Output
|
||||
)
|
||||
),
|
||||
[NewConnectorName] = ConnectorsAfter -- ConnectorsBefore,
|
||||
ConnectorName = emqx_utils_maps:deep_get(
|
||||
[
|
||||
bin(?ROOT_KEY),
|
||||
bin(BridgeV2Type),
|
||||
bin(BridgeName),
|
||||
<<"connector">>
|
||||
],
|
||||
Output
|
||||
),
|
||||
NewConnectorRawConf =
|
||||
emqx_utils_maps:deep_get(
|
||||
[
|
||||
<<"connectors">>,
|
||||
bin(ConnectorType),
|
||||
bin(NewConnectorName)
|
||||
bin(ConnectorName)
|
||||
],
|
||||
Output
|
||||
),
|
||||
|
@ -1180,7 +1180,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
|||
NewFakeGlobalConfig = #{
|
||||
<<"connectors">> => #{
|
||||
bin(ConnectorType) => #{
|
||||
bin(NewConnectorName) => NewConnectorRawConf
|
||||
bin(ConnectorName) => NewConnectorRawConf
|
||||
}
|
||||
},
|
||||
<<"bridges_v2">> => #{
|
||||
|
@ -1199,7 +1199,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
|||
_ ->
|
||||
#{
|
||||
connector_type => ConnectorType,
|
||||
connector_name => NewConnectorName,
|
||||
connector_name => ConnectorName,
|
||||
connector_conf => NewConnectorRawConf,
|
||||
bridge_v2_type => BridgeV2Type,
|
||||
bridge_v2_name => BridgeName,
|
||||
|
@ -1214,6 +1214,7 @@ split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
|||
bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
|
||||
RawConf = maps:without([<<"name">>], RawConfig0),
|
||||
TmpName = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
|
||||
PreviousRawConf = undefined,
|
||||
#{
|
||||
connector_type := _ConnectorType,
|
||||
connector_name := _NewConnectorName,
|
||||
|
@ -1221,7 +1222,7 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
|
|||
bridge_v2_type := BridgeV2Type,
|
||||
bridge_v2_name := _BridgeName,
|
||||
bridge_v2_conf := BridgeV2RawConf
|
||||
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
|
||||
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf, PreviousRawConf),
|
||||
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
|
||||
|
||||
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
|
||||
|
|
|
@ -394,7 +394,7 @@ schema("/bridges_v2_probe") ->
|
|||
case emqx_bridge_v2:disable_enable(enable_func(Enable), BridgeType, BridgeName) of
|
||||
{ok, _} ->
|
||||
?NO_CONTENT;
|
||||
{error, {pre_config_update, _, not_found}} ->
|
||||
{error, {pre_config_update, _, bridge_not_found}} ->
|
||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||
{error, {_, _, timeout}} ->
|
||||
?SERVICE_UNAVAILABLE(<<"request timeout">>);
|
||||
|
|
|
@ -29,6 +29,8 @@
|
|||
post_request/0
|
||||
]).
|
||||
|
||||
-export([enterprise_api_schemas/1]).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
enterprise_api_schemas(Method) ->
|
||||
%% We *must* do this to ensure the module is really loaded, especially when we use
|
||||
|
@ -70,7 +72,7 @@ post_request() ->
|
|||
api_schema("post").
|
||||
|
||||
api_schema(Method) ->
|
||||
EE = enterprise_api_schemas(Method),
|
||||
EE = ?MODULE:enterprise_api_schemas(Method),
|
||||
hoconsc:union(bridge_api_union(EE)).
|
||||
|
||||
bridge_api_union(Refs) ->
|
||||
|
|
|
@ -0,0 +1,683 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_v1_compatibility_layer_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
|
||||
-import(emqx_common_test_helpers, [on_exit/1]).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
Apps = emqx_cth_suite:start(
|
||||
app_specs(),
|
||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||
),
|
||||
emqx_mgmt_api_test_util:init_suite(),
|
||||
[{apps, Apps} | Config].
|
||||
|
||||
end_per_suite(Config) ->
|
||||
Apps = ?config(apps, Config),
|
||||
emqx_mgmt_api_test_util:end_suite(),
|
||||
emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
app_specs() ->
|
||||
[
|
||||
emqx,
|
||||
emqx_conf,
|
||||
emqx_connector,
|
||||
emqx_bridge,
|
||||
emqx_rule_engine
|
||||
].
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
%% Setting up mocks for fake connector and bridge V2
|
||||
setup_mocks(),
|
||||
ets:new(fun_table_name(), [named_table, public]),
|
||||
%% Create a fake connector
|
||||
{ok, _} = emqx_connector:create(con_type(), con_name(), con_config()),
|
||||
[
|
||||
{mocked_mods, [
|
||||
emqx_connector_schema,
|
||||
emqx_connector_resource,
|
||||
|
||||
emqx_bridge_v2
|
||||
]}
|
||||
| Config
|
||||
].
|
||||
|
||||
end_per_testcase(_TestCase, _Config) ->
|
||||
ets:delete(fun_table_name()),
|
||||
delete_all_bridges_and_connectors(),
|
||||
meck:unload(),
|
||||
emqx_common_test_helpers:call_janitor(),
|
||||
ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
setup_mocks() ->
|
||||
MeckOpts = [passthrough, no_link, no_history],
|
||||
|
||||
catch meck:new(emqx_connector_schema, MeckOpts),
|
||||
meck:expect(emqx_connector_schema, fields, 1, con_schema()),
|
||||
meck:expect(emqx_connector_schema, connector_type_to_bridge_types, 1, [con_type()]),
|
||||
|
||||
catch meck:new(emqx_connector_resource, MeckOpts),
|
||||
meck:expect(emqx_connector_resource, connector_to_resource_type, 1, con_mod()),
|
||||
|
||||
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
|
||||
meck:expect(emqx_bridge_v2_schema, fields, 1, bridge_schema()),
|
||||
|
||||
catch meck:new(emqx_bridge_v2, MeckOpts),
|
||||
meck:expect(emqx_bridge_v2, bridge_v2_type_to_connector_type, 1, con_type()),
|
||||
meck:expect(emqx_bridge_v2, bridge_v1_type_to_bridge_v2_type, 1, bridge_type()),
|
||||
IsBridgeV2TypeFun = fun(Type) ->
|
||||
BridgeV2Type = bridge_type(),
|
||||
BridgeV2TypeBin = bridge_type_bin(),
|
||||
case Type of
|
||||
BridgeV2Type -> true;
|
||||
BridgeV2TypeBin -> true;
|
||||
_ -> false
|
||||
end
|
||||
end,
|
||||
meck:expect(emqx_bridge_v2, is_bridge_v2_type, 1, IsBridgeV2TypeFun),
|
||||
|
||||
catch meck:new(emqx_bridge_v2_schema, MeckOpts),
|
||||
meck:expect(
|
||||
emqx_bridge_v2_schema,
|
||||
enterprise_api_schemas,
|
||||
1,
|
||||
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end
|
||||
),
|
||||
|
||||
ok.
|
||||
|
||||
con_mod() ->
|
||||
emqx_bridge_v2_test_connector.
|
||||
|
||||
con_type() ->
|
||||
bridge_type().
|
||||
|
||||
con_name() ->
|
||||
my_connector.
|
||||
|
||||
bridge_type() ->
|
||||
test_bridge_type.
|
||||
|
||||
bridge_type_bin() ->
|
||||
atom_to_binary(bridge_type(), utf8).
|
||||
|
||||
con_schema() ->
|
||||
[
|
||||
{
|
||||
con_type(),
|
||||
hoconsc:mk(
|
||||
hoconsc:map(name, hoconsc:ref(?MODULE, "connector")),
|
||||
#{
|
||||
desc => <<"Test Connector Config">>,
|
||||
required => false
|
||||
}
|
||||
)
|
||||
}
|
||||
].
|
||||
|
||||
fields("connector") ->
|
||||
[
|
||||
{enable, hoconsc:mk(any(), #{})},
|
||||
{resource_opts, hoconsc:mk(map(), #{})}
|
||||
];
|
||||
fields("api_post") ->
|
||||
[
|
||||
{connector, hoconsc:mk(binary(), #{})},
|
||||
{name, hoconsc:mk(binary(), #{})},
|
||||
{type, hoconsc:mk(bridge_type(), #{})},
|
||||
{send_to, hoconsc:mk(atom(), #{})}
|
||||
| fields("connector")
|
||||
].
|
||||
|
||||
con_config() ->
|
||||
#{
|
||||
<<"enable">> => true,
|
||||
<<"resource_opts">> => #{
|
||||
%% Set this to a low value to make the test run faster
|
||||
<<"health_check_interval">> => 100
|
||||
}
|
||||
}.
|
||||
|
||||
bridge_schema() ->
|
||||
bridge_schema(_Opts = #{}).
|
||||
|
||||
bridge_schema(Opts) ->
|
||||
Type = maps:get(bridge_type, Opts, bridge_type()),
|
||||
[
|
||||
{
|
||||
Type,
|
||||
hoconsc:mk(
|
||||
hoconsc:map(name, typerefl:map()),
|
||||
#{
|
||||
desc => <<"Test Bridge Config">>,
|
||||
required => false
|
||||
}
|
||||
)
|
||||
}
|
||||
].
|
||||
|
||||
bridge_config() ->
|
||||
#{
|
||||
<<"connector">> => atom_to_binary(con_name()),
|
||||
<<"enable">> => true,
|
||||
<<"send_to">> => registered_process_name(),
|
||||
<<"resource_opts">> => #{
|
||||
<<"resume_interval">> => 100
|
||||
}
|
||||
}.
|
||||
|
||||
fun_table_name() ->
|
||||
emqx_bridge_v1_compatibility_layer_SUITE_fun_table.
|
||||
|
||||
registered_process_name() ->
|
||||
my_registered_process.
|
||||
|
||||
delete_all_bridges_and_connectors() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
ct:pal("removing bridge ~p", [{Type, Name}]),
|
||||
emqx_bridge_v2:remove(Type, Name)
|
||||
end,
|
||||
emqx_bridge_v2:list()
|
||||
),
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
ct:pal("removing connector ~p", [{Type, Name}]),
|
||||
emqx_connector:remove(Type, Name)
|
||||
end,
|
||||
emqx_connector:list()
|
||||
),
|
||||
update_root_config(#{}),
|
||||
ok.
|
||||
|
||||
%% Hocon does not support placing a fun in a config map so we replace it with a string
|
||||
wrap_fun(Fun) ->
|
||||
UniqRef = make_ref(),
|
||||
UniqRefBin = term_to_binary(UniqRef),
|
||||
UniqRefStr = iolist_to_binary(base64:encode(UniqRefBin)),
|
||||
ets:insert(fun_table_name(), {UniqRefStr, Fun}),
|
||||
UniqRefStr.
|
||||
|
||||
unwrap_fun(UniqRefStr) ->
|
||||
ets:lookup_element(fun_table_name(), UniqRefStr, 2).
|
||||
|
||||
update_root_config(RootConf) ->
|
||||
emqx_conf:update([bridges_v2], RootConf, #{override_to => cluster}).
|
||||
|
||||
delete_all_bridges() ->
|
||||
lists:foreach(
|
||||
fun(#{name := Name, type := Type}) ->
|
||||
ok = emqx_bridge:remove(Type, Name)
|
||||
end,
|
||||
emqx_bridge:list()
|
||||
),
|
||||
%% at some point during the tests, sometimes `emqx_bridge:list()'
|
||||
%% returns an empty list, but `emqx:get_config([bridges])' returns
|
||||
%% a bunch of orphan test bridges...
|
||||
lists:foreach(fun emqx_resource:remove/1, emqx_resource:list_instances()),
|
||||
emqx_config:put([bridges], #{}),
|
||||
ok.
|
||||
|
||||
maybe_json_decode(X) ->
|
||||
case emqx_utils_json:safe_decode(X, [return_maps]) of
|
||||
{ok, Decoded} -> Decoded;
|
||||
{error, _} -> X
|
||||
end.
|
||||
|
||||
request(Method, Path, Params) ->
|
||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||
Opts = #{return_all => true},
|
||||
case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
|
||||
{ok, {Status, Headers, Body0}} ->
|
||||
Body = maybe_json_decode(Body0),
|
||||
{ok, {Status, Headers, Body}};
|
||||
{error, {Status, Headers, Body0}} ->
|
||||
Body =
|
||||
case emqx_utils_json:safe_decode(Body0, [return_maps]) of
|
||||
{ok, Decoded0 = #{<<"message">> := Msg0}} ->
|
||||
Msg = maybe_json_decode(Msg0),
|
||||
Decoded0#{<<"message">> := Msg};
|
||||
{ok, Decoded0} ->
|
||||
Decoded0;
|
||||
{error, _} ->
|
||||
Body0
|
||||
end,
|
||||
{error, {Status, Headers, Body}};
|
||||
Error ->
|
||||
Error
|
||||
end.
|
||||
|
||||
list_bridges_http_api_v1() ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
||||
ct:pal("list bridges (http v1)"),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("list bridges (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
list_bridges_http_api_v2() ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]),
|
||||
ct:pal("list bridges (http v2)"),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("list bridges (http v2) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
list_connectors_http() ->
|
||||
Path = emqx_mgmt_api_test_util:api_path(["connectors"]),
|
||||
ct:pal("list connectors"),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("list connectors result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
get_bridge_http_api_v1(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
|
||||
ct:pal("get bridge (http v1) (~p)", [#{name => Name}]),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("get bridge (http v1) (~p) result:\n ~p", [#{name => Name}, Res]),
|
||||
Res.
|
||||
|
||||
get_bridge_http_api_v2(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]),
|
||||
ct:pal("get bridge (http v2) (~p)", [#{name => Name}]),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("get bridge (http v2) (~p) result:\n ~p", [#{name => Name}, Res]),
|
||||
Res.
|
||||
|
||||
get_connector_http(Name) ->
|
||||
ConnectorId = emqx_connector_resource:connector_id(con_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["connectors", ConnectorId]),
|
||||
ct:pal("get connector (~p)", [#{name => Name, id => ConnectorId}]),
|
||||
Res = request(get, Path, _Params = []),
|
||||
ct:pal("get connector (~p) result:\n ~p", [#{name => Name}, Res]),
|
||||
Res.
|
||||
|
||||
create_bridge_http_api_v1(Opts) ->
|
||||
Name = maps:get(name, Opts),
|
||||
Overrides = maps:get(overrides, Opts, #{}),
|
||||
BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
|
||||
BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
|
||||
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
|
||||
ct:pal("creating bridge (http v1): ~p", [Params]),
|
||||
Res = request(post, Path, Params),
|
||||
ct:pal("bridge create (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
create_bridge_http_api_v2(Opts) ->
|
||||
Name = maps:get(name, Opts),
|
||||
Overrides = maps:get(overrides, Opts, #{}),
|
||||
BridgeConfig = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
|
||||
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2"]),
|
||||
ct:pal("creating bridge (http v2): ~p", [Params]),
|
||||
Res = request(post, Path, Params),
|
||||
ct:pal("bridge create (http v2) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
update_bridge_http_api_v1(Opts) ->
|
||||
Name = maps:get(name, Opts),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Overrides = maps:get(overrides, Opts, #{}),
|
||||
BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
|
||||
BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
|
||||
Params = BridgeConfig,
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
|
||||
ct:pal("updating bridge (http v1): ~p", [Params]),
|
||||
Res = request(put, Path, Params),
|
||||
ct:pal("bridge update (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
delete_bridge_http_api_v1(Opts) ->
|
||||
Name = maps:get(name, Opts),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId]),
|
||||
ct:pal("deleting bridge (http v1)"),
|
||||
Res = request(delete, Path, _Params = []),
|
||||
ct:pal("bridge delete (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
delete_bridge_http_api_v2(Opts) ->
|
||||
Name = maps:get(name, Opts),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId]),
|
||||
ct:pal("deleting bridge (http v2)"),
|
||||
Res = request(delete, Path, _Params = []),
|
||||
ct:pal("bridge delete (http v2) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
enable_bridge_http_api_v1(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "true"]),
|
||||
ct:pal("enabling bridge (http v1)"),
|
||||
Res = request(put, Path, _Params = []),
|
||||
ct:pal("bridge enable (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
enable_bridge_http_api_v2(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "true"]),
|
||||
ct:pal("enabling bridge (http v2)"),
|
||||
Res = request(put, Path, _Params = []),
|
||||
ct:pal("bridge enable (http v2) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
disable_bridge_http_api_v1(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, "enable", "false"]),
|
||||
ct:pal("disabling bridge (http v1)"),
|
||||
Res = request(put, Path, _Params = []),
|
||||
ct:pal("bridge disable (http v1) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
disable_bridge_http_api_v2(Name) ->
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, "enable", "false"]),
|
||||
ct:pal("disabling bridge (http v2)"),
|
||||
Res = request(put, Path, _Params = []),
|
||||
ct:pal("bridge disable (http v2) result:\n ~p", [Res]),
|
||||
Res.
|
||||
|
||||
bridge_operation_http_api_v1(Name, Op0) ->
|
||||
Op = atom_to_list(Op0),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges", BridgeId, Op]),
|
||||
ct:pal("bridge op ~p (http v1)", [Op]),
|
||||
Res = request(post, Path, _Params = []),
|
||||
ct:pal("bridge op ~p (http v1) result:\n ~p", [Op, Res]),
|
||||
Res.
|
||||
|
||||
bridge_operation_http_api_v2(Name, Op0) ->
|
||||
Op = atom_to_list(Op0),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_v2", BridgeId, Op]),
|
||||
ct:pal("bridge op ~p (http v2)", [Op]),
|
||||
Res = request(post, Path, _Params = []),
|
||||
ct:pal("bridge op ~p (http v2) result:\n ~p", [Op, Res]),
|
||||
Res.
|
||||
|
||||
bridge_node_operation_http_api_v1(Name, Node0, Op0) ->
|
||||
Op = atom_to_list(Op0),
|
||||
Node = atom_to_list(Node0),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges", BridgeId, Op]),
|
||||
ct:pal("bridge node op ~p (http v1)", [{Node, Op}]),
|
||||
Res = request(post, Path, _Params = []),
|
||||
ct:pal("bridge node op ~p (http v1) result:\n ~p", [{Node, Op}, Res]),
|
||||
Res.
|
||||
|
||||
bridge_node_operation_http_api_v2(Name, Node0, Op0) ->
|
||||
Op = atom_to_list(Op0),
|
||||
Node = atom_to_list(Node0),
|
||||
BridgeId = emqx_bridge_resource:bridge_id(bridge_type(), Name),
|
||||
Path = emqx_mgmt_api_test_util:api_path(["nodes", Node, "bridges_v2", BridgeId, Op]),
|
||||
ct:pal("bridge node op ~p (http v2)", [{Node, Op}]),
|
||||
Res = request(post, Path, _Params = []),
|
||||
ct:pal("bridge node op ~p (http v2) result:\n ~p", [{Node, Op}, Res]),
|
||||
Res.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Test cases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_name_too_long(_Config) ->
|
||||
LongName = list_to_binary(lists:duplicate(256, $a)),
|
||||
?assertMatch(
|
||||
{error,
|
||||
{{_, 400, _}, _, #{<<"message">> := #{<<"reason">> := <<"Name is too long", _/binary>>}}}},
|
||||
create_bridge_http_api_v1(#{name => LongName})
|
||||
),
|
||||
ok.
|
||||
|
||||
t_scenario_1(_Config) ->
|
||||
%% ===================================================================================
|
||||
%% Pre-conditions
|
||||
%% ===================================================================================
|
||||
?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()),
|
||||
%% created in the test case init
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{}]}}, list_connectors_http()),
|
||||
{ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}} = list_connectors_http(),
|
||||
|
||||
%% ===================================================================================
|
||||
%% Create a single bridge v2. It should still be listed and functional when using v1
|
||||
%% APIs.
|
||||
%% ===================================================================================
|
||||
NameA = <<"bridgev2a">>,
|
||||
?assertMatch(
|
||||
{ok, {{_, 201, _}, _, #{}}},
|
||||
create_bridge_http_api_v1(#{name => NameA})
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()),
|
||||
%% created a new one from the v1 API
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)),
|
||||
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)),
|
||||
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)),
|
||||
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop)),
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart)
|
||||
),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)),
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start)
|
||||
),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)),
|
||||
|
||||
{ok, {{_, 200, _}, _, #{<<"connector">> := GeneratedConnName}}} = get_bridge_http_api_v2(NameA),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}},
|
||||
get_connector_http(GeneratedConnName)
|
||||
),
|
||||
|
||||
%% ===================================================================================
|
||||
%% Update the bridge using v1 API.
|
||||
%% ===================================================================================
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, _}},
|
||||
update_bridge_http_api_v1(#{name => NameA})
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)),
|
||||
|
||||
%% ===================================================================================
|
||||
%% Now create a new bridge_v2 pointing to the same connector. It should no longer be
|
||||
%% functions via v1 API, nor be listed in it. The new bridge must create a new
|
||||
%% channel, so that this bridge is no longer considered v1.
|
||||
%% ===================================================================================
|
||||
NameB = <<"bridgev2b">>,
|
||||
?assertMatch(
|
||||
{ok, {{_, 201, _}, _, #{}}},
|
||||
create_bridge_http_api_v2(#{
|
||||
name => NameB, overrides => #{<<"connector">> => GeneratedConnName}
|
||||
})
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, [#{<<"name">> := _}, #{<<"name">> := _}]}}, list_bridges_http_api_v2()
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()),
|
||||
?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, #{}}}, get_bridge_http_api_v1(NameB)),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameB}}}, get_bridge_http_api_v2(NameB)),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}},
|
||||
get_connector_http(GeneratedConnName)
|
||||
),
|
||||
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, disable_bridge_http_api_v1(NameB)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, enable_bridge_http_api_v1(NameB)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)),
|
||||
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, stop)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, start)),
|
||||
?assertMatch({error, {{_, 400, _}, _, _}}, bridge_operation_http_api_v1(NameB, restart)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, start)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameB, restart)),
|
||||
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), stop)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), start)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameA, node(), restart)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), stop)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), start)
|
||||
),
|
||||
?assertMatch(
|
||||
{error, {{_, 400, _}, _, _}}, bridge_node_operation_http_api_v1(NameB, node(), restart)
|
||||
),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, stop)),
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, stop)),
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, node(), start)
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, node(), start)
|
||||
),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameA, restart)),
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_node_operation_http_api_v2(NameB, restart)),
|
||||
|
||||
%% ===================================================================================
|
||||
%% Delete the 2nd new bridge so it appears again in the V1 API.
|
||||
%% ===================================================================================
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}},
|
||||
delete_bridge_http_api_v2(#{name => NameB})
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v1()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{<<"name">> := NameA}]}}, list_bridges_http_api_v2()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, [#{}, #{}]}}, list_connectors_http()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 200, _}, _, #{<<"name">> := NameA}}}, get_bridge_http_api_v2(NameA)),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"name">> := GeneratedConnName}}},
|
||||
get_connector_http(GeneratedConnName)
|
||||
),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, disable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, enable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)),
|
||||
?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({ok, {{_, 204, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)),
|
||||
|
||||
%% ===================================================================================
|
||||
%% Delete the last bridge using API v1. The generated connector should also be
|
||||
%% removed.
|
||||
%% ===================================================================================
|
||||
?assertMatch(
|
||||
{ok, {{_, 204, _}, _, _}},
|
||||
delete_bridge_http_api_v1(#{name => NameA})
|
||||
),
|
||||
?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v1()),
|
||||
?assertMatch({ok, {{_, 200, _}, _, []}}, list_bridges_http_api_v2()),
|
||||
%% only the pre-existing one should remain.
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, [#{<<"name">> := PreexistentConnectorName}]}},
|
||||
list_connectors_http()
|
||||
),
|
||||
?assertMatch(
|
||||
{ok, {{_, 200, _}, _, #{<<"name">> := PreexistentConnectorName}}},
|
||||
get_connector_http(PreexistentConnectorName)
|
||||
),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, get_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, get_connector_http(GeneratedConnName)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v1(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, disable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, enable_bridge_http_api_v2(NameA)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, stop)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, start)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v1(NameA, restart)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, stop)),
|
||||
?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, start)),
|
||||
%% TODO: currently, only `start' op is supported by the v2 API.
|
||||
%% ?assertMatch({error, {{_, 404, _}, _, _}}, bridge_operation_http_api_v2(NameA, restart)),
|
||||
|
||||
ok.
|
|
@ -222,13 +222,8 @@ encode_payload(State, Selected) ->
|
|||
OrderingKey = render_key(OrderingKeyTemplate, Selected),
|
||||
Attributes = proc_attributes(AttributesTemplate, Selected),
|
||||
Payload0 = #{data => base64:encode(Data)},
|
||||
Payload1 = put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
|
||||
put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
|
||||
|
||||
put_if(Acc, K, V, true) ->
|
||||
Acc#{K => V};
|
||||
put_if(Acc, _K, _V, false) ->
|
||||
Acc.
|
||||
Payload1 = emqx_utils_maps:put_if(Payload0, attributes, Attributes, map_size(Attributes) > 0),
|
||||
emqx_utils_maps:put_if(Payload1, 'orderingKey', OrderingKey, OrderingKey =/= <<>>).
|
||||
|
||||
-spec render_payload(emqx_placeholder:tmpl_token(), map()) -> binary().
|
||||
render_payload([] = _Template, Selected) ->
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
|
||||
-export([get_response/0, put_request/0, post_request/0]).
|
||||
|
||||
-export([connector_type_to_bridge_types/1]).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
enterprise_api_schemas(Method) ->
|
||||
%% We *must* do this to ensure the module is really loaded, especially when we use
|
||||
|
@ -69,21 +71,31 @@ has_connector_field(BridgeConf, ConnectorFields) ->
|
|||
ConnectorFields
|
||||
).
|
||||
|
||||
bridge_configs_to_transform(_BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields) ->
|
||||
bridge_configs_to_transform(
|
||||
_BridgeType, [] = _BridgeNameBridgeConfList, _ConnectorFields, _RawConfig
|
||||
) ->
|
||||
[];
|
||||
bridge_configs_to_transform(BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields) ->
|
||||
bridge_configs_to_transform(
|
||||
BridgeType, [{BridgeName, BridgeConf} | Rest], ConnectorFields, RawConfig
|
||||
) ->
|
||||
case has_connector_field(BridgeConf, ConnectorFields) of
|
||||
true ->
|
||||
PreviousRawConfig =
|
||||
emqx_utils_maps:deep_get(
|
||||
[<<"bridges_v2">>, to_bin(BridgeType), to_bin(BridgeName)],
|
||||
RawConfig,
|
||||
undefined
|
||||
),
|
||||
[
|
||||
{BridgeType, BridgeName, BridgeConf, ConnectorFields}
|
||||
| bridge_configs_to_transform(BridgeType, Rest, ConnectorFields)
|
||||
{BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}
|
||||
| bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig)
|
||||
];
|
||||
false ->
|
||||
bridge_configs_to_transform(BridgeType, Rest, ConnectorFields)
|
||||
bridge_configs_to_transform(BridgeType, Rest, ConnectorFields, RawConfig)
|
||||
end.
|
||||
|
||||
split_bridge_to_connector_and_action(
|
||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields}}
|
||||
{ConnectorsMap, {BridgeType, BridgeName, BridgeConf, ConnectorFields, PreviousRawConfig}}
|
||||
) ->
|
||||
%% Get connector fields from bridge config
|
||||
ConnectorMap = lists:foldl(
|
||||
|
@ -120,8 +132,12 @@ split_bridge_to_connector_and_action(
|
|||
BridgeConf,
|
||||
ConnectorFields
|
||||
),
|
||||
%% Generate a connector name
|
||||
ConnectorName = generate_connector_name(ConnectorsMap, BridgeName, 0),
|
||||
%% Generate a connector name, if needed. Avoid doing so if there was a previous config.
|
||||
ConnectorName =
|
||||
case PreviousRawConfig of
|
||||
#{<<"connector">> := ConnectorName0} -> ConnectorName0;
|
||||
_ -> generate_connector_name(ConnectorsMap, BridgeName, 0)
|
||||
end,
|
||||
%% Add connector field to action map
|
||||
ActionMap = maps:put(<<"connector">>, ConnectorName, ActionMap0),
|
||||
{BridgeType, BridgeName, ActionMap, ConnectorName, ConnectorMap}.
|
||||
|
@ -143,27 +159,24 @@ generate_connector_name(ConnectorsMap, BridgeName, Attempt) ->
|
|||
end.
|
||||
|
||||
transform_old_style_bridges_to_connector_and_actions_of_type(
|
||||
{ConnectorType, #{type := {map, name, {ref, ConnectorConfSchemaMod, ConnectorConfSchemaName}}}},
|
||||
{ConnectorType, #{type := ?MAP(_Name, ?R_REF(ConnectorConfSchemaMod, ConnectorConfSchemaName))}},
|
||||
RawConfig
|
||||
) ->
|
||||
ConnectorFields = ConnectorConfSchemaMod:fields(ConnectorConfSchemaName),
|
||||
BridgeTypes = connector_type_to_bridge_types(ConnectorType),
|
||||
BridgeTypes = ?MODULE:connector_type_to_bridge_types(ConnectorType),
|
||||
BridgesConfMap = maps:get(<<"bridges">>, RawConfig, #{}),
|
||||
ConnectorsConfMap = maps:get(<<"connectors">>, RawConfig, #{}),
|
||||
BridgeConfigsToTransform1 =
|
||||
lists:foldl(
|
||||
fun(BridgeType, ToTranformSoFar) ->
|
||||
BridgeConfigsToTransform =
|
||||
lists:flatmap(
|
||||
fun(BridgeType) ->
|
||||
BridgeNameToBridgeMap = maps:get(to_bin(BridgeType), BridgesConfMap, #{}),
|
||||
BridgeNameBridgeConfList = maps:to_list(BridgeNameToBridgeMap),
|
||||
NewToTransform = bridge_configs_to_transform(
|
||||
BridgeType, BridgeNameBridgeConfList, ConnectorFields
|
||||
),
|
||||
[NewToTransform, ToTranformSoFar]
|
||||
bridge_configs_to_transform(
|
||||
BridgeType, BridgeNameBridgeConfList, ConnectorFields, RawConfig
|
||||
)
|
||||
end,
|
||||
[],
|
||||
BridgeTypes
|
||||
),
|
||||
BridgeConfigsToTransform = lists:flatten(BridgeConfigsToTransform1),
|
||||
ConnectorsWithTypeMap = maps:get(to_bin(ConnectorType), ConnectorsConfMap, #{}),
|
||||
BridgeConfigsToTransformWithConnectorConf = lists:zip(
|
||||
lists:duplicate(length(BridgeConfigsToTransform), ConnectorsWithTypeMap),
|
||||
|
@ -200,7 +213,7 @@ transform_old_style_bridges_to_connector_and_actions_of_type(
|
|||
).
|
||||
|
||||
transform_bridges_v1_to_connectors_and_bridges_v2(RawConfig) ->
|
||||
ConnectorFields = fields(connectors),
|
||||
ConnectorFields = ?MODULE:fields(connectors),
|
||||
NewRawConf = lists:foldl(
|
||||
fun transform_old_style_bridges_to_connector_and_actions_of_type/2,
|
||||
RawConfig,
|
||||
|
|
|
@ -33,7 +33,8 @@
|
|||
diff_maps/2,
|
||||
best_effort_recursive_sum/3,
|
||||
if_only_to_toggle_enable/2,
|
||||
update_if_present/3
|
||||
update_if_present/3,
|
||||
put_if/4
|
||||
]).
|
||||
|
||||
-export_type([config_key/0, config_key_path/0]).
|
||||
|
@ -303,3 +304,8 @@ update_if_present(Key, Fun, Map) ->
|
|||
_ ->
|
||||
Map
|
||||
end.
|
||||
|
||||
put_if(Acc, K, V, true) ->
|
||||
Acc#{K => V};
|
||||
put_if(Acc, _K, _V, false) ->
|
||||
Acc.
|
||||
|
|
Loading…
Reference in New Issue