diff --git a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl index d4401cfd0..e8a500e85 100644 --- a/apps/emqx_bridge/src/emqx_bridge_v2_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_v2_api.erl @@ -786,7 +786,7 @@ handle_update(ConfRootKey, Id, Conf0) -> handle_delete(ConfRootKey, Id, QueryStringOpts) -> ?TRY_PARSE_ID( Id, - case emqx_bridge_v2:lookup(BridgeType, BridgeName) of + case emqx_bridge_v2:lookup(ConfRootKey, BridgeType, BridgeName) of {ok, _} -> AlsoDeleteActions = case maps:get(<<"also_delete_dep_actions">>, QueryStringOpts, <<"false">>) of diff --git a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl index 0ce8c620e..5ef897369 100644 --- a/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_v2_api_SUITE.erl @@ -25,8 +25,10 @@ -include_lib("snabbkaffe/include/test_macros.hrl"). -define(ACTIONS_ROOT, "actions"). +-define(SOURCES_ROOT, "sources"). -define(ACTION_CONNECTOR_NAME, <<"my_connector">>). +-define(SOURCE_CONNECTOR_NAME, <<"my_connector">>). -define(RESOURCE(NAME, TYPE), #{ <<"enable">> => true, @@ -106,6 +108,9 @@ ). -define(KAFKA_BRIDGE_UPDATE(Name), ?KAFKA_BRIDGE_UPDATE(Name, ?ACTION_CONNECTOR_NAME)). +-define(SOURCE_TYPE_STR, "mqtt"). +-define(SOURCE_TYPE, <>). + -define(APPSPECS, [ emqx_conf, emqx, @@ -162,7 +167,11 @@ init_per_group(single = Group, Config) -> Apps = emqx_cth_suite:start(?APPSPECS ++ [?APPSPEC_DASHBOARD], #{work_dir => WorkDir}), init_api([{group, single}, {group_apps, Apps}, {node, node()} | Config]); init_per_group(actions, Config) -> - [{bridge_kind, action} | Config]. + [{bridge_kind, action} | Config]; +init_per_group(sources, Config) -> + [{bridge_kind, source} | Config]; +init_per_group(_Group, Config) -> + Config. init_api(Config) -> Node = ?config(node, Config), @@ -209,7 +218,17 @@ init_per_testcase(_TestCase, Config) -> Nodes -> [erpc:call(Node, ?MODULE, init_mocks, []) || Node <- Nodes] end, - {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config), + case ?config(bridge_kind, Config) of + action -> + {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR, Config); + source -> + {ok, 201, _} = request( + post, + uri(["connectors"]), + source_connector_create_config(#{}), + Config + ) + end, Config. end_per_testcase(_TestCase, Config) -> @@ -268,18 +287,7 @@ init_mocks() -> ok. clear_resources() -> - lists:foreach( - fun(#{type := Type, name := Name}) -> - ok = emqx_bridge_v2:remove(Type, Name) - end, - emqx_bridge_v2:list() - ), - lists:foreach( - fun(#{type := Type, name := Name}) -> - ok = emqx_connector:remove(Type, Name) - end, - emqx_connector:list() - ). + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(). expect_on_all_nodes(Mod, Function, Fun, Config) -> case ?config(cluster_nodes, Config) of @@ -394,6 +402,135 @@ json(B) when is_binary(B) -> Error end. +group_path(Config) -> + case emqx_common_test_helpers:group_path(Config) of + [] -> + undefined; + Path -> + Path + end. + +source_connector_config_base() -> + #{ + <<"enable">> => true, + <<"description">> => <<"my connector">>, + <<"pool_size">> => 3, + <<"proto_ver">> => <<"v5">>, + <<"server">> => <<"127.0.0.1:1883">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">>, + <<"start_after_created">> => true, + <<"start_timeout">> => <<"5s">> + } + }. + +source_connector_create_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + Conf0 = maps:merge( + source_connector_config_base(), + #{ + <<"enable">> => true, + <<"type">> => ?SOURCE_TYPE, + <<"name">> => ?SOURCE_CONNECTOR_NAME + } + ), + maps:merge( + Conf0, + Overrides + ). + +source_config_base() -> + #{ + <<"enable">> => true, + <<"connector">> => ?SOURCE_CONNECTOR_NAME, + <<"parameters">> => + #{ + <<"remote">> => + #{ + <<"topic">> => <<"remote/topic">>, + <<"qos">> => 2 + } + }, + <<"resource_opts">> => #{ + <<"batch_size">> => 1, + <<"batch_time">> => <<"0ms">>, + <<"buffer_mode">> => <<"memory_only">>, + <<"buffer_seg_bytes">> => <<"10MB">>, + <<"health_check_interval">> => <<"15s">>, + <<"inflight_window">> => 100, + <<"max_buffer_bytes">> => <<"256MB">>, + <<"metrics_flush_interval">> => <<"1s">>, + <<"query_mode">> => <<"sync">>, + <<"request_ttl">> => <<"45s">>, + <<"resume_interval">> => <<"15s">>, + <<"worker_pool_size">> => <<"1">> + } + }. + +source_create_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + Conf0 = maps:merge( + source_config_base(), + #{ + <<"enable">> => true, + <<"type">> => ?SOURCE_TYPE + } + ), + maps:merge( + Conf0, + Overrides + ). + +source_update_config(Overrides0) -> + Overrides = emqx_utils_maps:binary_key_map(Overrides0), + maps:merge( + source_config_base(), + Overrides + ). + +get_common_values(Kind, FnName) -> + case Kind of + actions -> + #{ + api_root_key => ?ACTIONS_ROOT, + type => ?ACTION_TYPE, + default_connector_name => ?ACTION_CONNECTOR_NAME, + create_config_fn => + fun(Overrides) -> + Name = maps:get(name, Overrides, FnName), + ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME), + ?KAFKA_BRIDGE(Name, ConnectorName) + end, + update_config_fn => + fun(Overrides) -> + Name = maps:get(name, Overrides, FnName), + ConnectorName = maps:get(connector, Overrides, ?ACTION_CONNECTOR_NAME), + ?KAFKA_BRIDGE_UPDATE(Name, ConnectorName) + end, + create_connector_config_fn => + fun(Overrides) -> + ConnectorName = maps:get(name, Overrides, ?ACTION_CONNECTOR_NAME), + ?ACTIONS_CONNECTOR(ConnectorName) + end + }; + sources -> + #{ + api_root_key => ?SOURCES_ROOT, + type => ?SOURCE_TYPE, + default_connector_name => ?SOURCE_CONNECTOR_NAME, + create_config_fn => fun(Overrides0) -> + Overrides = + case Overrides0 of + #{name := _} -> Overrides0; + _ -> Overrides0#{name => FnName} + end, + source_create_config(Overrides) + end, + update_config_fn => fun source_update_config/1, + create_connector_config_fn => fun source_connector_create_config/1 + } + end. + %%------------------------------------------------------------------------------ %% Testcases %%------------------------------------------------------------------------------ @@ -404,76 +541,95 @@ json(B) when is_binary(B) -> t_bridges_lifecycle(matrix) -> [ [single, actions], - [cluster, actions] + [single, sources], + [cluster, actions], + [cluster, sources] ]; t_bridges_lifecycle(Config) -> + [_SingleOrCluster, Kind | _] = group_path(Config), + FnName = atom_to_binary(?FUNCTION_NAME), + #{ + api_root_key := APIRootKey, + type := Type, + default_connector_name := DefaultConnectorName, + create_config_fn := CreateConfigFn, + update_config_fn := UpdateConfigFn, + create_connector_config_fn := CreateConnectorConfigFn + } = get_common_values(Kind, FnName), %% assert we there's no bridges at first - {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + {ok, 200, []} = request_json(get, uri([APIRootKey]), Config), - {ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "foo"]), Config), - {ok, 404, _} = request(get, uri([?ACTIONS_ROOT, "kafka_producer:foo"]), Config), + {ok, 404, _} = request(get, uri([APIRootKey, "foo"]), Config), + {ok, 404, _} = request(get, uri([APIRootKey, "kafka_producer:foo"]), Config), %% need a var for patterns below - BridgeName = ?BRIDGE_NAME, + BridgeName = FnName, + CreateRes = request_json( + post, + uri([APIRootKey]), + CreateConfigFn(#{}), + Config + ), ?assertMatch( {ok, 201, #{ - <<"type">> := ?ACTION_TYPE, + <<"type">> := Type, <<"name">> := BridgeName, <<"enable">> := true, <<"status">> := <<"connected">>, <<"node_status">> := [_ | _], - <<"connector">> := ?ACTION_CONNECTOR_NAME, + <<"connector">> := DefaultConnectorName, <<"parameters">> := #{}, - <<"local_topic">> := _, <<"resource_opts">> := _ }}, - request_json( - post, - uri([?ACTIONS_ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME), - Config - ) + CreateRes, + #{name => BridgeName, type => Type, connector => DefaultConnectorName} ), + case Kind of + actions -> + ?assertMatch({ok, 201, #{<<"local_topic">> := _}}, CreateRes); + sources -> + ok + end, %% list all bridges, assert bridge is in it ?assertMatch( {ok, 200, [ #{ - <<"type">> := ?ACTION_TYPE, + <<"type">> := Type, <<"name">> := BridgeName, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _] } ]}, - request_json(get, uri([?ACTIONS_ROOT]), Config) + request_json(get, uri([APIRootKey]), Config) ), %% list all bridges, assert bridge is in it ?assertMatch( {ok, 200, [ #{ - <<"type">> := ?ACTION_TYPE, + <<"type">> := Type, <<"name">> := BridgeName, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _] } ]}, - request_json(get, uri([?ACTIONS_ROOT]), Config) + request_json(get, uri([APIRootKey]), Config) ), %% get the bridge by id - BridgeID = emqx_bridge_resource:bridge_id(?ACTION_TYPE, ?BRIDGE_NAME), + BridgeID = emqx_bridge_resource:bridge_id(Type, ?BRIDGE_NAME), ?assertMatch( {ok, 200, #{ - <<"type">> := ?ACTION_TYPE, + <<"type">> := Type, <<"name">> := BridgeName, <<"enable">> := true, <<"status">> := _, <<"node_status">> := [_ | _] }}, - request_json(get, uri([?ACTIONS_ROOT, BridgeID]), Config) + request_json(get, uri([APIRootKey, BridgeID]), Config) ), ?assertMatch( @@ -481,14 +637,19 @@ t_bridges_lifecycle(Config) -> <<"code">> := <<"BAD_REQUEST">>, <<"message">> := _ }}, - request_json(post, uri([?ACTIONS_ROOT, BridgeID, "brababbel"]), Config) + request_json(post, uri([APIRootKey, BridgeID, "brababbel"]), Config) ), %% update bridge config - {ok, 201, _} = request(post, uri(["connectors"]), ?ACTIONS_CONNECTOR(<<"foobla">>), Config), + {ok, 201, _} = request( + post, + uri(["connectors"]), + CreateConnectorConfigFn(#{name => <<"foobla">>}), + Config + ), ?assertMatch( {ok, 200, #{ - <<"type">> := ?ACTION_TYPE, + <<"type">> := Type, <<"name">> := BridgeName, <<"connector">> := <<"foobla">>, <<"enable">> := true, @@ -497,8 +658,8 @@ t_bridges_lifecycle(Config) -> }}, request_json( put, - uri([?ACTIONS_ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla">>), + uri([APIRootKey, BridgeID]), + UpdateConfigFn(#{connector => <<"foobla">>}), Config ) ), @@ -510,8 +671,8 @@ t_bridges_lifecycle(Config) -> }} = request_json( put, - uri([?ACTIONS_ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"does_not_exist">>), + uri([APIRootKey, BridgeID]), + UpdateConfigFn(#{connector => <<"does_not_exist">>}), Config ), ?assertMatch( @@ -546,8 +707,8 @@ t_bridges_lifecycle(Config) -> }} = request_json( put, - uri([?ACTIONS_ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME, <<"foobla2">>), + uri([APIRootKey, BridgeID]), + UpdateConfigFn(#{connector => <<"foobla2">>}), Config ), ?assertMatch( @@ -556,8 +717,8 @@ t_bridges_lifecycle(Config) -> ), %% delete the bridge - {ok, 204, <<>>} = request(delete, uri([?ACTIONS_ROOT, BridgeID]), Config), - {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + {ok, 204, <<>>} = request(delete, uri([APIRootKey, BridgeID]), Config), + {ok, 200, []} = request_json(get, uri([APIRootKey]), Config), %% try create with unknown connector name {ok, 400, #{ @@ -566,8 +727,8 @@ t_bridges_lifecycle(Config) -> }} = request_json( post, - uri([?ACTIONS_ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"does_not_exist">>), + uri([APIRootKey]), + CreateConfigFn(#{connector => <<"does_not_exist">>}), Config ), ?assertMatch( @@ -582,8 +743,8 @@ t_bridges_lifecycle(Config) -> }} = request_json( post, - uri([?ACTIONS_ROOT]), - ?KAFKA_BRIDGE(?BRIDGE_NAME, <<"foobla2">>), + uri([APIRootKey]), + CreateConfigFn(#{connector => <<"foobla2">>}), Config ), ?assertMatch( @@ -592,7 +753,7 @@ t_bridges_lifecycle(Config) -> ), %% make sure nothing has been created above - {ok, 200, []} = request_json(get, uri([?ACTIONS_ROOT]), Config), + {ok, 200, []} = request_json(get, uri([APIRootKey]), Config), %% update a deleted bridge returns an error ?assertMatch( @@ -602,8 +763,8 @@ t_bridges_lifecycle(Config) -> }}, request_json( put, - uri([?ACTIONS_ROOT, BridgeID]), - ?KAFKA_BRIDGE_UPDATE(?BRIDGE_NAME), + uri([APIRootKey, BridgeID]), + UpdateConfigFn(#{}), Config ) ), @@ -614,7 +775,7 @@ t_bridges_lifecycle(Config) -> <<"code">> := <<"NOT_FOUND">>, <<"message">> := _ }}, - request_json(delete, uri([?ACTIONS_ROOT, BridgeID]), Config) + request_json(delete, uri([APIRootKey, BridgeID]), Config) ), %% try delete unknown bridge id @@ -623,12 +784,14 @@ t_bridges_lifecycle(Config) -> <<"code">> := <<"NOT_FOUND">>, <<"message">> := <<"Invalid bridge ID", _/binary>> }}, - request_json(delete, uri([?ACTIONS_ROOT, "foo"]), Config) + request_json(delete, uri([APIRootKey, "foo"]), Config) ), %% Try create bridge with bad characters as name - {ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"隋达"/utf8>>), Config), - {ok, 400, _} = request(post, uri([?ACTIONS_ROOT]), ?KAFKA_BRIDGE(<<"a.b">>), Config), + {ok, 400, _} = request( + post, uri([APIRootKey]), CreateConfigFn(#{name => <<"隋达"/utf8>>}), Config + ), + {ok, 400, _} = request(post, uri([APIRootKey]), CreateConfigFn(#{name => <<"a.b">>}), Config), ok. t_broken_bridge_config(matrix) ->