Merge pull request #12181 from thalesmg/fix-fill-default-v1-api-return-r54-20231215

fix(bridge_v1_api): fill defaults for v2 raw configs and fix redis connector schema
This commit is contained in:
Thales Macedo Garitezi 2023-12-18 11:34:07 -03:00 committed by GitHub
commit 3f06ebcaf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 124 additions and 60 deletions

View File

@ -620,8 +620,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
?INTERNAL_ERROR(Reason) ?INTERNAL_ERROR(Reason)
end. end.
lookup_from_local_node(BridgeType, BridgeName) -> lookup_from_local_node(ActionType, ActionName) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of case emqx_bridge:lookup(ActionType, ActionName) of
{ok, Res} -> {ok, format_resource(Res, node())}; {ok, Res} -> {ok, format_resource(Res, node())};
Error -> Error Error -> Error
end. end.
@ -895,25 +895,19 @@ aggregate_metrics(
format_resource( format_resource(
#{ #{
type := Type, type := ActionType,
name := BridgeName, name := BridgeName,
raw_config := RawConf, raw_config := RawConf,
resource_data := ResourceData resource_data := ResourceData
}, },
Node Node
) -> ) ->
RawConfFull = BridgeV1Type = downgrade_type(ActionType, emqx_bridge_lib:get_conf(ActionType, BridgeName)),
case emqx_bridge_v2:is_bridge_v2_type(Type) of RawConfFull = fill_defaults(BridgeV1Type, RawConf),
true ->
%% The defaults are already filled in
RawConf;
false ->
fill_defaults(Type, RawConf)
end,
redact( redact(
maps:merge( maps:merge(
RawConfFull#{ RawConfFull#{
type => downgrade_type(Type, emqx_bridge_lib:get_conf(Type, BridgeName)), type => BridgeV1Type,
name => maps:get(<<"name">>, RawConf, BridgeName), name => maps:get(<<"name">>, RawConf, BridgeName),
node => Node node => Node
}, },

View File

@ -235,11 +235,10 @@ mongodb_structs() ->
kafka_structs() -> kafka_structs() ->
[ [
{kafka_producer, {kafka,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)), hoconsc:map(name, ref(emqx_bridge_kafka, kafka_producer)),
#{ #{
aliases => [kafka],
desc => <<"Kafka Producer Bridge Config">>, desc => <<"Kafka Producer Bridge Config">>,
required => false, required => false,
converter => fun kafka_producer_converter/2 converter => fun kafka_producer_converter/2

View File

@ -37,7 +37,7 @@
]). ]).
%% for testing only %% for testing only
-export([enterprise_api_schemas/1]). -export([enterprise_api_schemas/1, enterprise_fields_bridges/0]).
%%====================================================================================== %%======================================================================================
%% Hocon Schema Definitions %% Hocon Schema Definitions
@ -175,11 +175,10 @@ roots() -> [{bridges, ?HOCON(?R_REF(bridges), #{importance => ?IMPORTANCE_LOW})}
fields(bridges) -> fields(bridges) ->
[ [
{http, {webhook,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_http_schema, "config")), hoconsc:map(name, ref(emqx_bridge_http_schema, "config")),
#{ #{
aliases => [webhook],
desc => ?DESC("bridges_webhook"), desc => ?DESC("bridges_webhook"),
required => false, required => false,
converter => fun http_bridge_converter/2 converter => fun http_bridge_converter/2
@ -198,7 +197,7 @@ fields(bridges) ->
end end
} }
)} )}
] ++ enterprise_fields_bridges(); ] ++ ?MODULE:enterprise_fields_bridges();
fields("metrics") -> fields("metrics") ->
[ [
{"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})}, {"dropped", mk(integer(), #{desc => ?DESC("metric_dropped")})},

View File

@ -21,7 +21,7 @@ empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}}, Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)), ?assertEqual(Conf1, check(Conf1)),
?assertEqual(#{<<"bridges">> => #{<<"http">> => #{}}}, check(Conf2)), ?assertEqual(#{<<"bridges">> => #{<<"webhook">> => #{}}}, check(Conf2)),
ok. ok.
%% ensure webhook config can be checked %% ensure webhook config can be checked
@ -33,7 +33,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"http">> := #{ <<"webhook">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -48,7 +48,7 @@ webhook_config_test() ->
?assertMatch( ?assertMatch(
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"http">> := #{ <<"webhook">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,
@ -61,7 +61,7 @@ webhook_config_test() ->
), ),
#{ #{
<<"bridges">> := #{ <<"bridges">> := #{
<<"http">> := #{ <<"webhook">> := #{
<<"the_name">> := <<"the_name">> :=
#{ #{
<<"method">> := get, <<"method">> := get,

View File

@ -106,7 +106,9 @@ setup_mocks() ->
emqx_bridge_v2_schema, emqx_bridge_v2_schema,
registered_api_schemas, registered_api_schemas,
1, 1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end fun(Method) ->
[{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v2_" ++ Method)}]
end
), ),
catch meck:new(emqx_bridge_schema, MeckOpts), catch meck:new(emqx_bridge_schema, MeckOpts),
@ -114,7 +116,24 @@ setup_mocks() ->
emqx_bridge_schema, emqx_bridge_schema,
enterprise_api_schemas, enterprise_api_schemas,
1, 1,
fun(Method) -> [{bridge_type_bin(), hoconsc:ref(?MODULE, "api_" ++ Method)}] end fun(Method) ->
[{bridge_type_bin(), hoconsc:ref(?MODULE, "api_v1_" ++ Method)}]
end
),
meck:expect(
emqx_bridge_schema,
enterprise_fields_bridges,
0,
fun() ->
[
{
bridge_type_bin(),
hoconsc:mk(
hoconsc:map(name, hoconsc:ref(?MODULE, v1_bridge)), #{}
)
}
]
end
), ),
ok. ok.
@ -156,7 +175,7 @@ fields("connector") ->
{on_start_fun, hoconsc:mk(binary(), #{})}, {on_start_fun, hoconsc:mk(binary(), #{})},
{ssl, hoconsc:ref(ssl)} {ssl, hoconsc:ref(ssl)}
]; ];
fields("api_post") -> fields("api_v2_post") ->
[ [
{connector, hoconsc:mk(binary(), #{})}, {connector, hoconsc:mk(binary(), #{})},
{name, hoconsc:mk(binary(), #{})}, {name, hoconsc:mk(binary(), #{})},
@ -164,6 +183,20 @@ fields("api_post") ->
{send_to, hoconsc:mk(atom(), #{})} {send_to, hoconsc:mk(atom(), #{})}
| fields("connector") | fields("connector")
]; ];
fields("api_v1_post") ->
ConnectorFields = proplists:delete(resource_opts, fields("connector")),
[
{connector, hoconsc:mk(binary(), #{})},
{name, hoconsc:mk(binary(), #{})},
{type, hoconsc:mk(bridge_type(), #{})},
{send_to, hoconsc:mk(atom(), #{})},
{resource_opts, hoconsc:mk(hoconsc:ref(?MODULE, v1_resource_opts), #{})}
| ConnectorFields
];
fields(v1_bridge) ->
lists:foldl(fun proplists:delete/2, fields("api_v1_post"), [name, type]);
fields(v1_resource_opts) ->
emqx_resource_schema:create_opts(_Overrides = []);
fields(ssl) -> fields(ssl) ->
emqx_schema:client_ssl_opts_schema(#{required => false}). emqx_schema:client_ssl_opts_schema(#{required => false}).
@ -333,9 +366,11 @@ get_connector_http(Name) ->
create_bridge_http_api_v1(Opts) -> create_bridge_http_api_v1(Opts) ->
Name = maps:get(name, Opts), Name = maps:get(name, Opts),
Overrides = maps:get(overrides, Opts, #{}), Overrides = maps:get(overrides, Opts, #{}),
OverrideFn = maps:get(override_fn, Opts, fun(X) -> X end),
BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides), BridgeConfig0 = emqx_utils_maps:deep_merge(bridge_config(), Overrides),
BridgeConfig = maps:without([<<"connector">>], BridgeConfig0), BridgeConfig = maps:without([<<"connector">>], BridgeConfig0),
Params = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name}, Params0 = BridgeConfig#{<<"type">> => bridge_type_bin(), <<"name">> => Name},
Params = OverrideFn(Params0),
Path = emqx_mgmt_api_test_util:api_path(["bridges"]), Path = emqx_mgmt_api_test_util:api_path(["bridges"]),
ct:pal("creating bridge (http v1): ~p", [Params]), ct:pal("creating bridge (http v1): ~p", [Params]),
Res = request(post, Path, Params), Res = request(post, Path, Params),
@ -919,3 +954,29 @@ t_obfuscated_secrets_probe(_Config) ->
), ),
ok. ok.
t_v1_api_fill_defaults(_Config) ->
%% Ensure only one sub-field is used, but we get back the defaults filled in.
BridgeName = ?FUNCTION_NAME,
OverrideFn = fun(Params) ->
ResourceOpts = #{<<"resume_interval">> => 100},
maps:put(<<"resource_opts">>, ResourceOpts, Params)
end,
?assertMatch(
{ok,
{{_, 201, _}, _, #{
<<"resource_opts">> :=
#{
<<"resume_interval">> := _,
<<"query_mode">> := _,
<<"inflight_window">> := _,
<<"start_timeout">> := _,
<<"start_after_created">> := _,
<<"max_buffer_bytes">> := _,
<<"batch_size">> := _
}
}}},
create_bridge_http_api_v1(#{name => BridgeName, override_fn => OverrideFn})
),
ok.

View File

@ -22,7 +22,7 @@ kafka_producer_test() ->
#{ #{
<<"bridges">> := <<"bridges">> :=
#{ #{
<<"kafka_producer">> := <<"kafka">> :=
#{ #{
<<"myproducer">> := <<"myproducer">> :=
#{<<"kafka">> := #{}} #{<<"kafka">> := #{}}
@ -35,7 +35,7 @@ kafka_producer_test() ->
#{ #{
<<"bridges">> := <<"bridges">> :=
#{ #{
<<"kafka_producer">> := <<"kafka">> :=
#{ #{
<<"myproducer">> := <<"myproducer">> :=
#{<<"local_topic">> := _} #{<<"local_topic">> := _}
@ -48,7 +48,7 @@ kafka_producer_test() ->
#{ #{
<<"bridges">> := <<"bridges">> :=
#{ #{
<<"kafka_producer">> := <<"kafka">> :=
#{ #{
<<"myproducer">> := <<"myproducer">> :=
#{ #{
@ -64,7 +64,7 @@ kafka_producer_test() ->
#{ #{
<<"bridges">> := <<"bridges">> :=
#{ #{
<<"kafka_producer">> := <<"kafka">> :=
#{ #{
<<"myproducer">> := <<"myproducer">> :=
#{ #{
@ -166,7 +166,7 @@ message_key_dispatch_validations_test() ->
?assertThrow( ?assertThrow(
{_, [ {_, [
#{ #{
path := "bridges.kafka_producer.myproducer.kafka", path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used" reason := "Message key cannot be empty when `key_dispatch` strategy is used"
} }
]}, ]},
@ -175,7 +175,7 @@ message_key_dispatch_validations_test() ->
?assertThrow( ?assertThrow(
{_, [ {_, [
#{ #{
path := "bridges.kafka_producer.myproducer.kafka", path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used" reason := "Message key cannot be empty when `key_dispatch` strategy is used"
} }
]}, ]},

View File

@ -122,7 +122,9 @@ fields("get_cluster") ->
method_fields(get, redis_cluster); method_fields(get, redis_cluster);
%% old bridge v1 schema %% old bridge v1 schema
fields(Type) when fields(Type) when
Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster Type == redis_single;
Type == redis_sentinel;
Type == redis_cluster
-> ->
redis_bridge_common_fields(Type) ++ redis_bridge_common_fields(Type) ++
connector_fields(Type); connector_fields(Type);

View File

@ -29,14 +29,12 @@ connector_type_name() -> redis.
schema_module() -> ?SCHEMA_MODULE. schema_module() -> ?SCHEMA_MODULE.
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) -> connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
fix_v1_type( maps:merge(
maps:merge( maps:without(
maps:without( [<<"connector">>],
[<<"connector">>], map_unindent(<<"parameters">>, ActionConfig)
map_unindent(<<"parameters">>, ActionConfig) ),
), map_unindent(<<"parameters">>, ConnectorConfig)
map_unindent(<<"parameters">>, ConnectorConfig)
)
). ).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
@ -77,9 +75,6 @@ bridge_v1_type_name() ->
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) -> bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
v1_type(Type). v1_type(Type).
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
Conf#{<<"type">> => v1_type(RedisType)}.
v1_type(<<"single">>) -> redis_single; v1_type(<<"single">>) -> redis_single;
v1_type(<<"sentinel">>) -> redis_sentinel; v1_type(<<"sentinel">>) -> redis_sentinel;
v1_type(<<"cluster">>) -> redis_cluster. v1_type(<<"cluster">>) -> redis_cluster.

View File

@ -82,9 +82,13 @@ on_start(InstId, Config) ->
end. end.
on_stop(InstId, #{conn_st := RedisConnSt}) -> on_stop(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_stop(InstId, RedisConnSt); Res = emqx_redis:on_stop(InstId, RedisConnSt),
?tp(redis_bridge_stopped, #{instance_id => InstId}),
Res;
on_stop(InstId, undefined = _State) -> on_stop(InstId, undefined = _State) ->
emqx_redis:on_stop(InstId, undefined). Res = emqx_redis:on_stop(InstId, undefined),
?tp(redis_bridge_stopped, #{instance_id => InstId}),
Res.
on_get_status(InstId, #{conn_st := RedisConnSt}) -> on_get_status(InstId, #{conn_st := RedisConnSt}) ->
emqx_redis:on_get_status(InstId, RedisConnSt). emqx_redis:on_get_status(InstId, RedisConnSt).

View File

@ -50,7 +50,6 @@ fields("config_connector") ->
#{required => true, desc => ?DESC(redis_parameters)} #{required => true, desc => ?DESC(redis_parameters)}
)} )}
] ++ ] ++
emqx_redis:redis_fields() ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++ emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(connector_resource_opts) -> fields(connector_resource_opts) ->

View File

@ -59,7 +59,11 @@ all() -> [{group, transports}, {group, rest}].
suite() -> [{timetrap, {minutes, 20}}]. suite() -> [{timetrap, {minutes, 20}}].
groups() -> groups() ->
ResourceSpecificTCs = [t_create_delete_bridge], ResourceSpecificTCs = [
t_create_delete_bridge,
t_create_via_http,
t_start_stop
],
TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs, TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs,
TypeGroups = [ TypeGroups = [
{group, redis_single}, {group, redis_single},
@ -130,10 +134,13 @@ wait_for_ci_redis(Checks, Config) ->
emqx_resource, emqx_resource,
emqx_connector, emqx_connector,
emqx_bridge, emqx_bridge,
emqx_rule_engine emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
], ],
#{work_dir => emqx_cth_suite:work_dir(Config)} #{work_dir => emqx_cth_suite:work_dir(Config)}
), ),
{ok, _Api} = emqx_common_test_http:create_default_app(),
[ [
{apps, Apps}, {apps, Apps},
{proxy_host, ProxyHost}, {proxy_host, ProxyHost},
@ -177,9 +184,8 @@ init_per_testcase(Testcase, Config0) ->
IsBatch = (BatchMode =:= batch_on), IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
[ [
{bridge_type, BridgeType}, {bridge_type, RedisType},
{bridge_config, BridgeConfig1}, {bridge_config, BridgeConfig1},
{is_batch, IsBatch} {is_batch, IsBatch}
| Config | Config
@ -425,6 +431,14 @@ t_create_disconnected(Config) ->
), ),
ok = emqx_bridge:remove(Type, Name). ok = emqx_bridge:remove(Type, Name).
t_create_via_http(Config) ->
ok = emqx_bridge_testlib:t_create_via_http(Config),
ok.
t_start_stop(Config) ->
ok = emqx_bridge_testlib:t_start_stop(Config, redis_bridge_stopped),
ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Helper functions %% Helper functions
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -62,25 +62,22 @@ roots() ->
fields(redis_single) -> fields(redis_single) ->
fields(redis_single_connector) ++ fields(redis_single_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(redis_single_connector) -> fields(redis_single_connector) ->
[ [
{server, server()}, {server, server()},
redis_type(single) redis_type(single)
]; ] ++ redis_fields();
fields(redis_cluster) -> fields(redis_cluster) ->
fields(redis_cluster_connector) ++ fields(redis_cluster_connector) ++
lists:keydelete(database, 1, redis_fields()) ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(redis_cluster_connector) -> fields(redis_cluster_connector) ->
[ [
{servers, servers()}, {servers, servers()},
redis_type(cluster) redis_type(cluster)
]; ] ++ lists:keydelete(database, 1, redis_fields());
fields(redis_sentinel) -> fields(redis_sentinel) ->
fields(redis_sentinel_connector) ++ fields(redis_sentinel_connector) ++
redis_fields() ++
emqx_connector_schema_lib:ssl_fields(); emqx_connector_schema_lib:ssl_fields();
fields(redis_sentinel_connector) -> fields(redis_sentinel_connector) ->
[ [
@ -91,7 +88,7 @@ fields(redis_sentinel_connector) ->
required => true, required => true,
desc => ?DESC("sentinel_desc") desc => ?DESC("sentinel_desc")
}} }}
]. ] ++ redis_fields().
server() -> server() ->
Meta = #{desc => ?DESC("server")}, Meta = #{desc => ?DESC("server")},

View File

@ -80,7 +80,7 @@ worker_pool_size_test_() ->
Conf = emqx_utils_maps:deep_put( Conf = emqx_utils_maps:deep_put(
[ [
<<"bridges">>, <<"bridges">>,
<<"http">>, <<"webhook">>,
<<"simple">>, <<"simple">>,
<<"resource_opts">>, <<"resource_opts">>,
<<"worker_pool_size">> <<"worker_pool_size">>
@ -88,7 +88,7 @@ worker_pool_size_test_() ->
BaseConf, BaseConf,
WorkerPoolSize WorkerPoolSize
), ),
#{<<"bridges">> := #{<<"http">> := #{<<"simple">> := CheckedConf}}} = check(Conf), #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := CheckedConf}}} = check(Conf),
#{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf, #{<<"resource_opts">> := #{<<"worker_pool_size">> := WPS}} = CheckedConf,
WPS WPS
end, end,
@ -117,7 +117,7 @@ worker_pool_size_test_() ->
%%=========================================================================== %%===========================================================================
parse_and_check_webhook_bridge(Hocon) -> parse_and_check_webhook_bridge(Hocon) ->
#{<<"bridges">> := #{<<"http">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)), #{<<"bridges">> := #{<<"webhook">> := #{<<"simple">> := Conf}}} = check(parse(Hocon)),
Conf. Conf.
parse(Hocon) -> parse(Hocon) ->