test(sources_api): add some tests to cover `/sources` HTTP API

Also fixes a bug with `DELETE /sources/:id`
This commit is contained in:
Thales Macedo Garitezi 2024-01-12 17:27:23 -03:00
parent 8f304d3456
commit fc88a1ed1e
2 changed files with 221 additions and 58 deletions

View File

@ -786,7 +786,7 @@ handle_update(ConfRootKey, Id, Conf0) ->
handle_delete(ConfRootKey, Id, QueryStringOpts) ->
?TRY_PARSE_ID(
Id,
case emqx_bridge_v2:lookup(BridgeType, BridgeName) of
case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of
{ok, _} ->
AlsoDeleteActions =
case maps:get(<<"also_delete_dep_actions">>, QueryStringOpts, <<"false">>) of

View File

@ -25,8 +25,10 @@
-include_lib("snabbkaffe/include/test_macros.hrl").
-define(ACTIONS_ROOT, "actions").
-define(SOURCES_ROOT, "sources").
-define(ACTION_CONNECTOR_NAME, <<"my_connector">>).
-define(SOURCE_CONNECTOR_NAME, <<"my_connector">>).
-define(RESOURCE(NAME, TYPE), #{
<<"enable">> => true,
@ -106,6 +108,9 @@
).
-define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?ACTION_CONNECTOR_NAME)).
-define(SOURCE_TYPE_STR, "mqtt").
-define(SOURCE_TYPE, <<?SOURCE_TYPE_STR>>).
-define(APPSPECS, [
emqx_conf,
emqx,
@ -162,7 +167,11 @@ init_per_group(single = Group, Config) ->
Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}),
init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]);
init_per_group(actions, Config) ->
[{bridge_kind, action} | Config].
[{bridge_kind, action} | Config];
init_per_group(sources, Config) ->
[{bridge_kind, source} | Config];
init_per_group(_Group, Config) ->
Config.
init_api(Config) ->
Node = ?config(node, Config),
@ -209,7 +218,17 @@ init_per_testcase(_TestCase, Config) ->
Nodes ->
[erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes]
end,
{ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config),
case ?config(bridge_kind, Config) of
action ->
{ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config);
source ->
{ok, 201, _} = request(
post,
uri(["connectors"]),
source_connector_create_config(#{}),
Config
)
end,
Config.
end_per_testcase(_TestCase, Config) ->
@ -268,18 +287,7 @@ init_mocks() ->
ok.
clear_resources() ->
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_bridge_v2:remove(Type, Name)
end,
emqx_bridge_v2:list()
),
lists:foreach(
fun(#{type := Type, name := Name}) ->
ok = emqx_connector:remove(Type, Name)
end,
emqx_connector:list()
).
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors().
expect_on_all_nodes(Mod, Function, Fun, Config) ->
case ?config(cluster_nodes, Config) of
@ -394,6 +402,135 @@ json(B) when is_binary(B) ->
Error
end.
group_path(Config) ->
case emqx_common_test_helpers:group_path(Config) of
[] ->
undefined;
Path ->
Path
end.
source_connector_config_base() ->
#{
<<"enable">> => true,
<<"description">> => <<"my connector">>,
<<"pool_size">> => 3,
<<"proto_ver">> => <<"v5">>,
<<"server">> => <<"127.0.0.1:1883">>,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"15s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
}.
source_connector_create_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
Conf0 = maps:merge(
source_connector_config_base(),
#{
<<"enable">> => true,
<<"type">> => ?SOURCE_TYPE,
<<"name">> => ?SOURCE_CONNECTOR_NAME
}
),
maps:merge(
Conf0,
Overrides
).
source_config_base() ->
#{
<<"enable">> => true,
<<"connector">> => ?SOURCE_CONNECTOR_NAME,
<<"parameters">> =>
#{
<<"remote">> =>
#{
<<"topic">> => <<"remote/topic">>,
<<"qos">> => 2
}
},
<<"resource_opts">> => #{
<<"batch_size">> => 1,
<<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"15s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"45s">>,
<<"resume_interval">> => <<"15s">>,
<<"worker_pool_size">> => <<"1">>
}
}.
source_create_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
Conf0 = maps:merge(
source_config_base(),
#{
<<"enable">> => true,
<<"type">> => ?SOURCE_TYPE
}
),
maps:merge(
Conf0,
Overrides
).
source_update_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
maps:merge(
source_config_base(),
Overrides
).
get_common_values(Kind, FnName) ->
case Kind of
actions ->
#{
api_root_key => ?ACTIONS_ROOT,
type => ?ACTION_TYPE,
default_connector_name => ?ACTION_CONNECTOR_NAME,
create_config_fn =>
fun(Overrides) ->
Name = maps:get(name, Overrides, FnName),
ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME),
?KAFKA_BRIDGE(Name, ConnectorName)
end,
update_config_fn =>
fun(Overrides) ->
Name = maps:get(name, Overrides, FnName),
ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME),
?KAFKA_BRIDGE_UPDATE(Name, ConnectorName)
end,
create_connector_config_fn =>
fun(Overrides) ->
ConnectorName = maps:get(name, Overrides, ?ACTION_CONNECTOR_NAME),
?ACTIONS_CONNECTOR(ConnectorName)
end
};
sources ->
#{
api_root_key => ?SOURCES_ROOT,
type => ?SOURCE_TYPE,
default_connector_name => ?SOURCE_CONNECTOR_NAME,
create_config_fn => fun(Overrides0) ->
Overrides =
case Overrides0 of
#{name := _} -> Overrides0;
_ -> Overrides0#{name => FnName}
end,
source_create_config(Overrides)
end,
update_config_fn => fun source_update_config/1,
create_connector_config_fn => fun source_connector_create_config/1
}
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -404,76 +541,95 @@ json(B) when is_binary(B) ->
t_bridges_lifecycle(matrix) ->
[
[single, actions],
[cluster, actions]
[single, sources],
[cluster, actions],
[cluster, sources]
];
t_bridges_lifecycle(Config) ->
[_SingleOrCluster, Kind | _] = group_path(Config),
FnName = atom_to_binary(?FUNCTION_NAME),
#{
api_root_key := APIRootKey,
type := Type,
default_connector_name := DefaultConnectorName,
create_config_fn := CreateConfigFn,
update_config_fn := UpdateConfigFn,
create_connector_config_fn := CreateConnectorConfigFn
} = get_common_values(Kind, FnName),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
{ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
{ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "foo"]), Config),
{ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "kafka_producer:foo"]), Config),
{ok, 404, _} = request(get, uri([APIRootKey, "foo"]), Config),
{ok, 404, _} = request(get, uri([APIRootKey, "kafka_producer:foo"]), Config),
%% need a var for patterns below
BridgeName = ?BRIDGE_NAME,
BridgeName = FnName,
CreateRes = request_json(
post,
uri([APIRootKey]),
CreateConfigFn(#{}),
Config
),
?assertMatch(
{ok, 201, #{
<<"type">> := ?ACTION_TYPE,
<<"type">> := Type,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := <<"connected">>,
<<"node_status">> := [_ | _],
<<"connector">> := ?ACTION_CONNECTOR_NAME,
<<"connector">> := DefaultConnectorName,
<<"parameters">> := #{},
<<"local_topic">> := _,
<<"resource_opts">> := _
}},
request_json(
post,
uri([?ACTIONS_ROOT]),
?KAFKA_BRIDGE(?BRIDGE_NAME),
Config
)
CreateRes,
#{name => BridgeName, type => Type, connector => DefaultConnectorName}
),
case Kind of
actions ->
?assertMatch({ok, 201, #{<<"local_topic">> := _}}, CreateRes);
sources ->
ok
end,
%% list all bridges, assert bridge is in it
?assertMatch(
{ok, 200, [
#{
<<"type">> := ?ACTION_TYPE,
<<"type">> := Type,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _]
}
]},
request_json(get, uri([?ACTIONS_ROOT]), Config)
request_json(get, uri([APIRootKey]), Config)
),
%% list all bridges, assert bridge is in it
?assertMatch(
{ok, 200, [
#{
<<"type">> := ?ACTION_TYPE,
<<"type">> := Type,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _]
}
]},
request_json(get, uri([?ACTIONS_ROOT]), Config)
request_json(get, uri([APIRootKey]), Config)
),
%% get the bridge by id
BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME),
BridgeID = emqx_bridge_resource:bridge_id(Type, ?BRIDGE_NAME),
?assertMatch(
{ok, 200, #{
<<"type">> := ?ACTION_TYPE,
<<"type">> := Type,
<<"name">> := BridgeName,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _]
}},
request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config)
request_json(get, uri([APIRootKey, BridgeID]), Config)
),
?assertMatch(
@ -481,14 +637,19 @@ t_bridges_lifecycle(Config) ->
<<"code">> := <<"BAD_REQUEST">>,
<<"message">> := _
}},
request_json(post, uri([?ACTIONS_ROOT, BridgeID, "brababbel"]), Config)
request_json(post, uri([APIRootKey, BridgeID, "brababbel"]), Config)
),
%% update bridge config
{ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR(<<"foobla">>), Config),
{ok, 201, _} = request(
post,
uri(["connectors"]),
CreateConnectorConfigFn(#{name => <<"foobla">>}),
Config
),
?assertMatch(
{ok, 200, #{
<<"type">> := ?ACTION_TYPE,
<<"type">> := Type,
<<"name">> := BridgeName,
<<"connector">> := <<"foobla">>,
<<"enable">> := true,
@ -497,8 +658,8 @@ t_bridges_lifecycle(Config) ->
}},
request_json(
put,
uri([?ACTIONS_ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla">>),
uri([APIRootKey, BridgeID]),
UpdateConfigFn(#{connector => <<"foobla">>}),
Config
)
),
@ -510,8 +671,8 @@ t_bridges_lifecycle(Config) ->
}} =
request_json(
put,
uri([?ACTIONS_ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"does_not_exist">>),
uri([APIRootKey, BridgeID]),
UpdateConfigFn(#{connector => <<"does_not_exist">>}),
Config
),
?assertMatch(
@ -546,8 +707,8 @@ t_bridges_lifecycle(Config) ->
}} =
request_json(
put,
uri([?ACTIONS_ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla2">>),
uri([APIRootKey, BridgeID]),
UpdateConfigFn(#{connector => <<"foobla2">>}),
Config
),
?assertMatch(
@ -556,8 +717,8 @@ t_bridges_lifecycle(Config) ->
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config),
{ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
{ok, 204, <<>>} = request(delete, uri([APIRootKey, BridgeID]), Config),
{ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
%% try create with unknown connector name
{ok, 400, #{
@ -566,8 +727,8 @@ t_bridges_lifecycle(Config) ->
}} =
request_json(
post,
uri([?ACTIONS_ROOT]),
?KAFKA_BRIDGE(?BRIDGE_NAME, <<"does_not_exist">>),
uri([APIRootKey]),
CreateConfigFn(#{connector => <<"does_not_exist">>}),
Config
),
?assertMatch(
@ -582,8 +743,8 @@ t_bridges_lifecycle(Config) ->
}} =
request_json(
post,
uri([?ACTIONS_ROOT]),
?KAFKA_BRIDGE(?BRIDGE_NAME, <<"foobla2">>),
uri([APIRootKey]),
CreateConfigFn(#{connector => <<"foobla2">>}),
Config
),
?assertMatch(
@ -592,7 +753,7 @@ t_bridges_lifecycle(Config) ->
),
%% make sure nothing has been created above
{ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config),
{ok, 200, []} = request_json(get, uri([APIRootKey]), Config),
%% update a deleted bridge returns an error
?assertMatch(
@ -602,8 +763,8 @@ t_bridges_lifecycle(Config) ->
}},
request_json(
put,
uri([?ACTIONS_ROOT, BridgeID]),
?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME),
uri([APIRootKey, BridgeID]),
UpdateConfigFn(#{}),
Config
)
),
@ -614,7 +775,7 @@ t_bridges_lifecycle(Config) ->
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := _
}},
request_json(delete, uri([?ACTIONS_ROOT, BridgeID]), Config)
request_json(delete, uri([APIRootKey, BridgeID]), Config)
),
%% try delete unknown bridge id
@ -623,12 +784,14 @@ t_bridges_lifecycle(Config) ->
<<"code">> := <<"NOT_FOUND">>,
<<"message">> := <<"Invalid bridge ID", _/binary>>
}},
request_json(delete, uri([?ACTIONS_ROOT, "foo"]), Config)
request_json(delete, uri([APIRootKey, "foo"]), Config)
),
%% Try create bridge with bad characters as name
{ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"隋达"/utf8>>), Config),
{ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config),
{ok, 400, _} = request(
post, uri([APIRootKey]), CreateConfigFn(#{name => <<"隋达"/utf8>>}), Config
),
{ok, 400, _} = request(post, uri([APIRootKey]), CreateConfigFn(#{name => <<"a.b">>}), Config),
ok.
t_broken_bridge_config(matrix) ->