fix(redis_bridge): fix connector schema and action info transformations
This commit is contained in:
parent
432ddc5a3b
commit
c29ada4666
|
@ -620,8 +620,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
?INTERNAL_ERROR(Reason)
|
?INTERNAL_ERROR(Reason)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_from_local_node(BridgeType, BridgeName) ->
|
lookup_from_local_node(ActionType, ActionName) ->
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(ActionType, ActionName) of
|
||||||
{ok, Res} -> {ok, format_resource(Res, node())};
|
{ok, Res} -> {ok, format_resource(Res, node())};
|
||||||
Error -> Error
|
Error -> Error
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -122,7 +122,9 @@ fields("get_cluster") ->
|
||||||
method_fields(get, redis_cluster);
|
method_fields(get, redis_cluster);
|
||||||
%% old bridge v1 schema
|
%% old bridge v1 schema
|
||||||
fields(Type) when
|
fields(Type) when
|
||||||
Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster
|
Type == redis_single;
|
||||||
|
Type == redis_sentinel;
|
||||||
|
Type == redis_cluster
|
||||||
->
|
->
|
||||||
redis_bridge_common_fields(Type) ++
|
redis_bridge_common_fields(Type) ++
|
||||||
connector_fields(Type);
|
connector_fields(Type);
|
||||||
|
|
|
@ -29,14 +29,12 @@ connector_type_name() -> redis.
|
||||||
schema_module() -> ?SCHEMA_MODULE.
|
schema_module() -> ?SCHEMA_MODULE.
|
||||||
|
|
||||||
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
connector_action_config_to_bridge_v1_config(ConnectorConfig, ActionConfig) ->
|
||||||
fix_v1_type(
|
|
||||||
maps:merge(
|
maps:merge(
|
||||||
maps:without(
|
maps:without(
|
||||||
[<<"connector">>],
|
[<<"connector">>],
|
||||||
map_unindent(<<"parameters">>, ActionConfig)
|
map_unindent(<<"parameters">>, ActionConfig)
|
||||||
),
|
),
|
||||||
map_unindent(<<"parameters">>, ConnectorConfig)
|
map_unindent(<<"parameters">>, ConnectorConfig)
|
||||||
)
|
|
||||||
).
|
).
|
||||||
|
|
||||||
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
|
||||||
|
@ -77,9 +75,6 @@ bridge_v1_type_name() ->
|
||||||
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
|
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"redis_type">> := Type}}, _}) ->
|
||||||
v1_type(Type).
|
v1_type(Type).
|
||||||
|
|
||||||
fix_v1_type(#{<<"redis_type">> := RedisType} = Conf) ->
|
|
||||||
Conf#{<<"type">> => v1_type(RedisType)}.
|
|
||||||
|
|
||||||
v1_type(<<"single">>) -> redis_single;
|
v1_type(<<"single">>) -> redis_single;
|
||||||
v1_type(<<"sentinel">>) -> redis_sentinel;
|
v1_type(<<"sentinel">>) -> redis_sentinel;
|
||||||
v1_type(<<"cluster">>) -> redis_cluster.
|
v1_type(<<"cluster">>) -> redis_cluster.
|
||||||
|
|
|
@ -82,9 +82,13 @@ on_start(InstId, Config) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_stop(InstId, #{conn_st := RedisConnSt}) ->
|
on_stop(InstId, #{conn_st := RedisConnSt}) ->
|
||||||
emqx_redis:on_stop(InstId, RedisConnSt);
|
Res = emqx_redis:on_stop(InstId, RedisConnSt),
|
||||||
|
?tp(redis_bridge_stopped, #{instance_id => InstId}),
|
||||||
|
Res;
|
||||||
on_stop(InstId, undefined = _State) ->
|
on_stop(InstId, undefined = _State) ->
|
||||||
emqx_redis:on_stop(InstId, undefined).
|
Res = emqx_redis:on_stop(InstId, undefined),
|
||||||
|
?tp(redis_bridge_stopped, #{instance_id => InstId}),
|
||||||
|
Res.
|
||||||
|
|
||||||
on_get_status(InstId, #{conn_st := RedisConnSt}) ->
|
on_get_status(InstId, #{conn_st := RedisConnSt}) ->
|
||||||
emqx_redis:on_get_status(InstId, RedisConnSt).
|
emqx_redis:on_get_status(InstId, RedisConnSt).
|
||||||
|
|
|
@ -50,7 +50,6 @@ fields("config_connector") ->
|
||||||
#{required => true, desc => ?DESC(redis_parameters)}
|
#{required => true, desc => ?DESC(redis_parameters)}
|
||||||
)}
|
)}
|
||||||
] ++
|
] ++
|
||||||
emqx_redis:redis_fields() ++
|
|
||||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts) ++
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(connector_resource_opts) ->
|
fields(connector_resource_opts) ->
|
||||||
|
|
|
@ -59,7 +59,11 @@ all() -> [{group, transports}, {group, rest}].
|
||||||
suite() -> [{timetrap, {minutes, 20}}].
|
suite() -> [{timetrap, {minutes, 20}}].
|
||||||
|
|
||||||
groups() ->
|
groups() ->
|
||||||
ResourceSpecificTCs = [t_create_delete_bridge],
|
ResourceSpecificTCs = [
|
||||||
|
t_create_delete_bridge,
|
||||||
|
t_create_via_http,
|
||||||
|
t_start_stop
|
||||||
|
],
|
||||||
TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs,
|
TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs,
|
||||||
TypeGroups = [
|
TypeGroups = [
|
||||||
{group, redis_single},
|
{group, redis_single},
|
||||||
|
@ -130,10 +134,13 @@ wait_for_ci_redis(Checks, Config) ->
|
||||||
emqx_resource,
|
emqx_resource,
|
||||||
emqx_connector,
|
emqx_connector,
|
||||||
emqx_bridge,
|
emqx_bridge,
|
||||||
emqx_rule_engine
|
emqx_rule_engine,
|
||||||
|
emqx_management,
|
||||||
|
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
|
||||||
],
|
],
|
||||||
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
||||||
),
|
),
|
||||||
|
{ok, _Api} = emqx_common_test_http:create_default_app(),
|
||||||
[
|
[
|
||||||
{apps, Apps},
|
{apps, Apps},
|
||||||
{proxy_host, ProxyHost},
|
{proxy_host, ProxyHost},
|
||||||
|
@ -177,9 +184,8 @@ init_per_testcase(Testcase, Config0) ->
|
||||||
IsBatch = (BatchMode =:= batch_on),
|
IsBatch = (BatchMode =:= batch_on),
|
||||||
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
|
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
|
||||||
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
|
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
|
||||||
BridgeType = list_to_atom(atom_to_list(RedisType) ++ "_producer"),
|
|
||||||
[
|
[
|
||||||
{bridge_type, BridgeType},
|
{bridge_type, RedisType},
|
||||||
{bridge_config, BridgeConfig1},
|
{bridge_config, BridgeConfig1},
|
||||||
{is_batch, IsBatch}
|
{is_batch, IsBatch}
|
||||||
| Config
|
| Config
|
||||||
|
@ -425,6 +431,14 @@ t_create_disconnected(Config) ->
|
||||||
),
|
),
|
||||||
ok = emqx_bridge:remove(Type, Name).
|
ok = emqx_bridge:remove(Type, Name).
|
||||||
|
|
||||||
|
t_create_via_http(Config) ->
|
||||||
|
ok = emqx_bridge_testlib:t_create_via_http(Config),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_stop(Config) ->
|
||||||
|
ok = emqx_bridge_testlib:t_start_stop(Config, redis_bridge_stopped),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
|
@ -62,25 +62,22 @@ roots() ->
|
||||||
|
|
||||||
fields(redis_single) ->
|
fields(redis_single) ->
|
||||||
fields(redis_single_connector) ++
|
fields(redis_single_connector) ++
|
||||||
redis_fields() ++
|
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(redis_single_connector) ->
|
fields(redis_single_connector) ->
|
||||||
[
|
[
|
||||||
{server, server()},
|
{server, server()},
|
||||||
redis_type(single)
|
redis_type(single)
|
||||||
];
|
] ++ redis_fields();
|
||||||
fields(redis_cluster) ->
|
fields(redis_cluster) ->
|
||||||
fields(redis_cluster_connector) ++
|
fields(redis_cluster_connector) ++
|
||||||
lists:keydelete(database, 1, redis_fields()) ++
|
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(redis_cluster_connector) ->
|
fields(redis_cluster_connector) ->
|
||||||
[
|
[
|
||||||
{servers, servers()},
|
{servers, servers()},
|
||||||
redis_type(cluster)
|
redis_type(cluster)
|
||||||
];
|
] ++ lists:keydelete(database, 1, redis_fields());
|
||||||
fields(redis_sentinel) ->
|
fields(redis_sentinel) ->
|
||||||
fields(redis_sentinel_connector) ++
|
fields(redis_sentinel_connector) ++
|
||||||
redis_fields() ++
|
|
||||||
emqx_connector_schema_lib:ssl_fields();
|
emqx_connector_schema_lib:ssl_fields();
|
||||||
fields(redis_sentinel_connector) ->
|
fields(redis_sentinel_connector) ->
|
||||||
[
|
[
|
||||||
|
@ -91,7 +88,7 @@ fields(redis_sentinel_connector) ->
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC("sentinel_desc")
|
desc => ?DESC("sentinel_desc")
|
||||||
}}
|
}}
|
||||||
].
|
] ++ redis_fields().
|
||||||
|
|
||||||
server() ->
|
server() ->
|
||||||
Meta = #{desc => ?DESC("server")},
|
Meta = #{desc => ?DESC("server")},
|
||||||
|
|
Loading…
Reference in New Issue