fix(kafka): don't return `parameters` from `/bridges` API

Fixes https://emqx.atlassian.net/browse/EMQX-11412
This commit is contained in:
Thales Macedo Garitezi 2023-11-17 10:26:20 -03:00
parent b0d670aaa9
commit b3dffa4390
7 changed files with 93 additions and 18 deletions

View File

@ -312,6 +312,25 @@ create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
Error
end.
api_spec_schemas(Root) ->
Method = get,
Path = emqx_mgmt_api_test_util:api_path(["schemas", Root]),
Params = [],
AuthHeader = [],
Opts = #{return_all => true},
case emqx_mgmt_api_test_util:request_api(Method, Path, "", AuthHeader, Params, Opts) of
{ok, {{_, 200, _}, _, Res0}} ->
#{<<"components">> := #{<<"schemas">> := Schemas}} =
emqx_utils_json:decode(Res0, [return_maps]),
Schemas
end.
bridges_api_spec_schemas() ->
api_spec_schemas("bridges").
actions_api_spec_schemas() ->
api_spec_schemas("actions").
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------

View File

@ -126,7 +126,7 @@ fields(action) ->
fields(actions) ->
Fields =
override(
emqx_bridge_kafka:producer_opts(),
emqx_bridge_kafka:producer_opts(action),
bridge_v2_overrides()
) ++
[

View File

@ -272,6 +272,22 @@ make_message() ->
timestamp => Time
}.
bridge_api_spec_props_for_get() ->
#{
<<"bridge_azure_event_hub.get_producer">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
Props.
action_api_spec_props_for_get() ->
#{
<<"bridge_azure_event_hub.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -341,3 +357,14 @@ t_same_name_azure_kafka_bridges(Config) ->
end
),
ok.
t_parameters_key_api_spec(_Config) ->
BridgeProps = bridge_api_spec_props_for_get(),
?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.

View File

@ -29,7 +29,7 @@
desc/1,
host_opts/0,
ssl_client_opts_fields/0,
producer_opts/0
producer_opts/1
]).
-export([
@ -261,7 +261,7 @@ fields("config_producer") ->
fields("config_consumer") ->
fields(kafka_consumer);
fields(kafka_producer) ->
connector_config_fields() ++ producer_opts();
connector_config_fields() ++ producer_opts(v1);
fields(kafka_producer_action) ->
[
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
@ -270,7 +270,7 @@ fields(kafka_producer_action) ->
desc => ?DESC(emqx_connector_schema, "connector_field"), required => true
})},
{description, emqx_schema:description_schema()}
] ++ producer_opts();
] ++ producer_opts(action);
fields(kafka_consumer) ->
connector_config_fields() ++ fields(consumer_opts);
fields(ssl_client_opts) ->
@ -601,25 +601,28 @@ connector_config_fields() ->
{ssl, mk(ref(ssl_client_opts), #{})}
].
producer_opts() ->
producer_opts(ActionOrBridgeV1) ->
[
%% Note: there's an implicit convention in `emqx_bridge' that,
%% for egress bridges with this config, the published messages
%% will be forwarded to such bridges.
{local_topic, mk(binary(), #{required => false, desc => ?DESC(mqtt_topic)})},
parameters_field(),
parameters_field(ActionOrBridgeV1),
{resource_opts, mk(ref(resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})}
].
%% Since e5.3.1, we want to rename the field 'kafka' to 'parameters'
%% Hoever we need to keep it backward compatible for generated schema json (version 0.1.0)
%% since schema is data for the 'schemas' API.
parameters_field() ->
parameters_field(ActionOrBridgeV1) ->
OverriddenV1 = <<"0.1.0">> =:= get(emqx_bridge_schema_version),
{Name, Alias} =
case get(emqx_bridge_schema_version) of
<<"0.1.0">> ->
case {OverriddenV1, ActionOrBridgeV1} of
{true, _} ->
{kafka, parameters};
_ ->
{_, v1} ->
{kafka, parameters};
{_, action} ->
{parameters, kafka}
end,
{Name,

View File

@ -32,9 +32,8 @@ bridge_v1_config_to_action_config(BridgeV1Conf, ConnectorName) ->
Config0 = emqx_action_info:transform_bridge_v1_config_to_action_config(
BridgeV1Conf, ConnectorName, schema_module(), kafka_producer
),
KafkaMap = emqx_utils_maps:deep_get([<<"parameters">>, <<"kafka">>], Config0, #{}),
Config1 = emqx_utils_maps:deep_remove([<<"parameters">>, <<"kafka">>], Config0),
Config2 = emqx_utils_maps:deep_merge(Config1, #{<<"parameters">> => KafkaMap}),
KafkaMap = maps:get(<<"kafka">>, BridgeV1Conf, #{}),
Config2 = emqx_utils_maps:deep_merge(Config0, #{<<"parameters">> => KafkaMap}),
maps:with(producer_action_field_keys(), Config2).
%%------------------------------------------------------------------------------------------

View File

@ -25,7 +25,7 @@ kafka_producer_test() ->
<<"kafka_producer">> :=
#{
<<"myproducer">> :=
#{<<"parameters">> := #{}}
#{<<"kafka">> := #{}}
}
}
},
@ -52,7 +52,7 @@ kafka_producer_test() ->
#{
<<"myproducer">> :=
#{
<<"parameters">> := #{},
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
@ -68,7 +68,7 @@ kafka_producer_test() ->
#{
<<"myproducer">> :=
#{
<<"parameters">> := #{},
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
@ -166,7 +166,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.parameters",
path := "bridges.kafka_producer.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
@ -175,7 +175,7 @@ message_key_dispatch_validations_test() ->
?assertThrow(
{_, [
#{
path := "bridges.kafka_producer.myproducer.parameters",
path := "bridges.kafka_producer.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},

View File

@ -182,6 +182,22 @@ create_action(Name, Config) ->
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
bridge_api_spec_props_for_get() ->
#{
<<"bridge_kafka.get_producer">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:bridges_api_spec_schemas(),
Props.
action_api_spec_props_for_get() ->
#{
<<"bridge_kafka.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
@ -342,3 +358,14 @@ t_bad_url(_Config) ->
),
?assertMatch({ok, #{status := connecting}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.
t_parameters_key_api_spec(_Config) ->
BridgeProps = bridge_api_spec_props_for_get(),
?assert(is_map_key(<<"kafka">>, BridgeProps), #{bridge_props => BridgeProps}),
?assertNot(is_map_key(<<"parameters">>, BridgeProps), #{bridge_props => BridgeProps}),
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"kafka">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.