fix: almost working bridge v1 compatibility layer for HTTP API
Most operation for the bridge V1 HTTP API compatibility layer are now working. This has been tested by creating/deleting/updating Kafka bridge through HTTP API, sending message to it, and resetting and checking metrics. The start, stop, restart, enable, disable operations still need to be fixed.
This commit is contained in:
parent
f8d330c2f3
commit
d61d80f338
|
@ -330,7 +330,13 @@ lookup(Type, Name, RawConf) ->
|
||||||
get_metrics(Type, Name) ->
|
get_metrics(Type, Name) ->
|
||||||
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
case emqx_bridge_v2:is_bridge_v2_type(Type) of
|
||||||
true ->
|
true ->
|
||||||
emqx_bridge_v2:get_metrics(Type, Name);
|
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
|
||||||
|
true ->
|
||||||
|
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
|
||||||
|
emqx_bridge_v2:get_metrics(BridgeV2Type, Name);
|
||||||
|
false ->
|
||||||
|
{error, not_bridge_v1_compatible}
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(Type, Name))
|
emqx_resource:get_metrics(emqx_bridge_resource:resource_id(Type, Name))
|
||||||
end.
|
end.
|
||||||
|
@ -347,7 +353,12 @@ disable_enable(Action, BridgeType, BridgeName) when
|
||||||
->
|
->
|
||||||
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
||||||
true ->
|
true ->
|
||||||
emqx_bridge_v2:disable_enable(Action, BridgeType, BridgeName);
|
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
|
||||||
|
true ->
|
||||||
|
do_disable_enable_bridge_v2_compatible(Action, BridgeType, BridgeName);
|
||||||
|
false ->
|
||||||
|
{error, not_bridge_v1_compatible}
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
emqx_conf:update(
|
emqx_conf:update(
|
||||||
config_key_path() ++ [BridgeType, BridgeName],
|
config_key_path() ++ [BridgeType, BridgeName],
|
||||||
|
@ -356,6 +367,15 @@ disable_enable(Action, BridgeType, BridgeName) when
|
||||||
)
|
)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
do_disable_enable_bridge_v2_compatible(enable, BridgeType, BridgeName) ->
|
||||||
|
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType),
|
||||||
|
_ = emqx_connector:disable_enable(enable, BridgeType, BridgeName),
|
||||||
|
emqx_bridge_v2:disable_enable(enable, BridgeV2Type, BridgeName);
|
||||||
|
do_disable_enable_bridge_v2_compatible(disable, BridgeType, BridgeName) ->
|
||||||
|
BridgeV2Type = emqx_bridge_v2:bridge_v1_type_to_bridge_v2_type(BridgeType),
|
||||||
|
_ = emqx_bridge_v2:disable_enable(disable, BridgeV2Type, BridgeName),
|
||||||
|
emqx_connector:disable_enable(disable, BridgeType, BridgeName).
|
||||||
|
|
||||||
create(BridgeType, BridgeName, RawConf) ->
|
create(BridgeType, BridgeName, RawConf) ->
|
||||||
?SLOG(debug, #{
|
?SLOG(debug, #{
|
||||||
bridge_action => create,
|
bridge_action => create,
|
||||||
|
|
|
@ -460,6 +460,8 @@ schema("/bridges_probe") ->
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
|
?BAD_REQUEST('ALREADY_EXISTS', <<"bridge already exists">>);
|
||||||
|
{error, not_bridge_v1_compatible} ->
|
||||||
|
?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg());
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
Conf = filter_out_request_body(Conf0),
|
Conf = filter_out_request_body(Conf0),
|
||||||
create_bridge(BridgeType, BridgeName, Conf)
|
create_bridge(BridgeType, BridgeName, Conf)
|
||||||
|
@ -487,11 +489,12 @@ schema("/bridges_probe") ->
|
||||||
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
case emqx_bridge:lookup(BridgeType, BridgeName) of
|
||||||
{ok, #{raw_config := RawConf}} ->
|
{ok, #{raw_config := RawConf}} ->
|
||||||
%% TODO will the maybe_upgrade step done by emqx_bridge:lookup cause any problems
|
%% TODO will the maybe_upgrade step done by emqx_bridge:lookup cause any problems
|
||||||
%%RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
|
|
||||||
Conf = deobfuscate(Conf1, RawConf),
|
Conf = deobfuscate(Conf1, RawConf),
|
||||||
update_bridge(BridgeType, BridgeName, Conf);
|
update_bridge(BridgeType, BridgeName, Conf);
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
|
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||||
|
{error, not_bridge_v1_compatible} ->
|
||||||
|
?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg())
|
||||||
end
|
end
|
||||||
);
|
);
|
||||||
'/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
|
'/bridges/:id'(delete, #{bindings := #{id := Id}, query_string := Qs}) ->
|
||||||
|
@ -519,7 +522,9 @@ schema("/bridges_probe") ->
|
||||||
?INTERNAL_ERROR(Reason)
|
?INTERNAL_ERROR(Reason)
|
||||||
end;
|
end;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName)
|
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||||
|
{error, not_bridge_v1_compatible} ->
|
||||||
|
?BAD_REQUEST('ALREADY_EXISTS', non_compat_bridge_msg())
|
||||||
end
|
end
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -529,7 +534,12 @@ schema("/bridges_probe") ->
|
||||||
'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
|
'/bridges/:id/metrics/reset'(put, #{bindings := #{id := Id}}) ->
|
||||||
?TRY_PARSE_ID(
|
?TRY_PARSE_ID(
|
||||||
Id,
|
Id,
|
||||||
begin
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
||||||
|
true ->
|
||||||
|
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(BridgeType),
|
||||||
|
ok = emqx_bridge_v2:reset_metrics(BridgeV2Type, BridgeName),
|
||||||
|
?NO_CONTENT;
|
||||||
|
false ->
|
||||||
ok = emqx_bridge_resource:reset_metrics(
|
ok = emqx_bridge_resource:reset_metrics(
|
||||||
emqx_bridge_resource:resource_id(BridgeType, BridgeName)
|
emqx_bridge_resource:resource_id(BridgeType, BridgeName)
|
||||||
),
|
),
|
||||||
|
@ -592,6 +602,8 @@ lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
|
||||||
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
{SuccCode, format_bridge_info([R || {ok, R} <- Results])};
|
||||||
{ok, [{error, not_found} | _]} ->
|
{ok, [{error, not_found} | _]} ->
|
||||||
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
?BRIDGE_NOT_FOUND(BridgeType, BridgeName);
|
||||||
|
{ok, [{error, not_bridge_v1_compatible} | _]} ->
|
||||||
|
?NOT_FOUND(non_compat_bridge_msg());
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?INTERNAL_ERROR(Reason)
|
?INTERNAL_ERROR(Reason)
|
||||||
end.
|
end.
|
||||||
|
@ -606,7 +618,17 @@ create_bridge(BridgeType, BridgeName, Conf) ->
|
||||||
create_or_update_bridge(BridgeType, BridgeName, Conf, 201).
|
create_or_update_bridge(BridgeType, BridgeName, Conf, 201).
|
||||||
|
|
||||||
update_bridge(BridgeType, BridgeName, Conf) ->
|
update_bridge(BridgeType, BridgeName, Conf) ->
|
||||||
create_or_update_bridge(BridgeType, BridgeName, Conf, 200).
|
case emqx_bridge_v2:is_bridge_v2_type(BridgeType) of
|
||||||
|
true ->
|
||||||
|
case emqx_bridge_v2:is_valid_bridge_v1(BridgeType, BridgeName) of
|
||||||
|
true ->
|
||||||
|
create_or_update_bridge(BridgeType, BridgeName, Conf, 200);
|
||||||
|
false ->
|
||||||
|
?NOT_FOUND(non_compat_bridge_msg())
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
create_or_update_bridge(BridgeType, BridgeName, Conf, 200)
|
||||||
|
end.
|
||||||
|
|
||||||
create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
|
create_or_update_bridge(BridgeType, BridgeName, Conf, HttpStatusCode) ->
|
||||||
case emqx_bridge:create(BridgeType, BridgeName, Conf) of
|
case emqx_bridge:create(BridgeType, BridgeName, Conf) of
|
||||||
|
@ -1119,3 +1141,6 @@ map_to_json(M0) ->
|
||||||
M2 = maps:without([value, <<"value">>], M1),
|
M2 = maps:without([value, <<"value">>], M1),
|
||||||
emqx_utils_json:encode(M2)
|
emqx_utils_json:encode(M2)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
non_compat_bridge_msg() ->
|
||||||
|
<<"bridge already exists as non Bridge V1 compatible Bridge V2 bridge">>.
|
||||||
|
|
|
@ -172,7 +172,13 @@ reset_metrics(ResourceId) ->
|
||||||
false ->
|
false ->
|
||||||
emqx_resource:reset_metrics(ResourceId);
|
emqx_resource:reset_metrics(ResourceId);
|
||||||
true ->
|
true ->
|
||||||
emqx_bridge_v2:reset_metrics(Type, Name)
|
case emqx_bridge_v2:is_valid_bridge_v1(Type, Name) of
|
||||||
|
true ->
|
||||||
|
BridgeV2Type = emqx_bridge_v2:bridge_v2_type_to_connector_type(Type),
|
||||||
|
emqx_bridge_v2:reset_metrics(BridgeV2Type, Name);
|
||||||
|
false ->
|
||||||
|
{error, not_bridge_v1_compatible}
|
||||||
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
restart(Type, Name) ->
|
restart(Type, Name) ->
|
||||||
|
|
|
@ -68,7 +68,6 @@
|
||||||
|
|
||||||
%% Exported for tests
|
%% Exported for tests
|
||||||
-export([
|
-export([
|
||||||
bridge_v2_type_to_connector_type/1,
|
|
||||||
id/2,
|
id/2,
|
||||||
id/3,
|
id/3,
|
||||||
is_valid_bridge_v1/2
|
is_valid_bridge_v1/2
|
||||||
|
@ -84,6 +83,7 @@
|
||||||
%% Compatibility API
|
%% Compatibility API
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
|
bridge_v2_type_to_connector_type/1,
|
||||||
is_bridge_v2_type/1,
|
is_bridge_v2_type/1,
|
||||||
lookup_and_transform_to_bridge_v1/2,
|
lookup_and_transform_to_bridge_v1/2,
|
||||||
list_and_transform_to_bridge_v1/0,
|
list_and_transform_to_bridge_v1/0,
|
||||||
|
@ -100,17 +100,47 @@
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
|
||||||
load() ->
|
load() ->
|
||||||
|
load_bridges(),
|
||||||
load_message_publish_hook(),
|
load_message_publish_hook(),
|
||||||
ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2),
|
ok = emqx_config_handler:add_handler(config_key_path_leaf(), emqx_bridge_v2),
|
||||||
ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2),
|
ok = emqx_config_handler:add_handler(config_key_path(), emqx_bridge_v2),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
load_bridges() ->
|
||||||
|
Bridges = emqx:get_config([?ROOT_KEY], #{}),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Type, Bridge}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Name, BridgeConf}) ->
|
||||||
|
install_bridge_v2(Type, Name, BridgeConf)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridge)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridges)
|
||||||
|
).
|
||||||
|
|
||||||
unload() ->
|
unload() ->
|
||||||
|
unload_bridges(),
|
||||||
unload_message_publish_hook(),
|
unload_message_publish_hook(),
|
||||||
emqx_conf:remove_handler(config_key_path()),
|
emqx_conf:remove_handler(config_key_path()),
|
||||||
emqx_conf:remove_handler(config_key_path_leaf()),
|
emqx_conf:remove_handler(config_key_path_leaf()),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
unload_bridges() ->
|
||||||
|
Bridges = emqx:get_config([?ROOT_KEY], #{}),
|
||||||
|
lists:foreach(
|
||||||
|
fun({Type, Bridge}) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun({Name, BridgeConf}) ->
|
||||||
|
uninstall_bridge_v2(Type, Name, BridgeConf)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridge)
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
maps:to_list(Bridges)
|
||||||
|
).
|
||||||
|
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
%% CRUD API
|
%% CRUD API
|
||||||
%%====================================================================
|
%%====================================================================
|
||||||
|
@ -131,13 +161,26 @@ lookup(Type, Name) ->
|
||||||
{ok, _, Data} ->
|
{ok, _, Data} ->
|
||||||
Data;
|
Data;
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
undefined
|
#{}
|
||||||
|
end,
|
||||||
|
%% Find the Bridge V2 status from the InstanceData
|
||||||
|
Channels = maps:get(added_channels, InstanceData, #{}),
|
||||||
|
BridgeV2Id = id(Type, Name, BridgeConnector),
|
||||||
|
ChannelStatus = maps:get(BridgeV2Id, Channels, undefined),
|
||||||
|
DisplayBridgeV2Status =
|
||||||
|
case ChannelStatus of
|
||||||
|
{error, undefined} -> <<"Unknown reason">>;
|
||||||
|
{error, Reason} -> emqx_utils:readable_error_msg(Reason);
|
||||||
|
connected -> <<"connected">>;
|
||||||
|
connecting -> <<"connecting">>;
|
||||||
|
Error -> emqx_utils:readable_error_msg(Error)
|
||||||
end,
|
end,
|
||||||
{ok, #{
|
{ok, #{
|
||||||
type => Type,
|
type => Type,
|
||||||
name => Name,
|
name => Name,
|
||||||
raw_config => RawConf,
|
raw_config => RawConf,
|
||||||
resource_data => InstanceData
|
resource_data => InstanceData,
|
||||||
|
status => DisplayBridgeV2Status
|
||||||
}}
|
}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -178,6 +221,8 @@ check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
|
||||||
case emqx_rule_engine:get_rule_ids_by_action(BridgeId) of
|
case emqx_rule_engine:get_rule_ids_by_action(BridgeId) of
|
||||||
[] ->
|
[] ->
|
||||||
remove(BridgeType, BridgeName);
|
remove(BridgeType, BridgeName);
|
||||||
|
_RuleIds when RemoveDeps =:= ignore_deps ->
|
||||||
|
remove(BridgeType, BridgeName);
|
||||||
RuleIds when RemoveDeps =:= false ->
|
RuleIds when RemoveDeps =:= false ->
|
||||||
{error, {rules_deps_on_this_bridge, RuleIds}};
|
{error, {rules_deps_on_this_bridge, RuleIds}};
|
||||||
RuleIds when RemoveDeps =:= true ->
|
RuleIds when RemoveDeps =:= true ->
|
||||||
|
@ -599,7 +644,10 @@ get_channels_for_connector(ConnectorId) ->
|
||||||
get_channels_for_connector(ConnectorName, BridgeV2Type) ->
|
get_channels_for_connector(ConnectorName, BridgeV2Type) ->
|
||||||
BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}),
|
BridgeV2s = emqx:get_config([?ROOT_KEY, BridgeV2Type], #{}),
|
||||||
[
|
[
|
||||||
{id(BridgeV2Type, Name, ConnectorName), Conf}
|
{id(BridgeV2Type, Name, ConnectorName), Conf#{
|
||||||
|
bridge_name => bin(Name),
|
||||||
|
bridge_type => bin(BridgeV2Type)
|
||||||
|
}}
|
||||||
|| {Name, Conf} <- maps:to_list(BridgeV2s),
|
|| {Name, Conf} <- maps:to_list(BridgeV2s),
|
||||||
bin(ConnectorName) =:= maps:get(connector, Conf, no_name)
|
bin(ConnectorName) =:= maps:get(connector, Conf, no_name)
|
||||||
].
|
].
|
||||||
|
@ -824,7 +872,8 @@ is_valid_bridge_v1(BridgeV1Type, BridgeName) ->
|
||||||
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
case lookup_raw_conf(BridgeV2Type, BridgeName) of
|
case lookup_raw_conf(BridgeV2Type, BridgeName) of
|
||||||
{error, _} ->
|
{error, _} ->
|
||||||
false;
|
%% If the bridge v2 does not exist, it is a valid bridge v1
|
||||||
|
true;
|
||||||
#{connector := ConnectorName} ->
|
#{connector := ConnectorName} ->
|
||||||
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
|
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
|
||||||
ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
|
ConnectorResourceId = emqx_connector_resource:resource_id(ConnectorType, ConnectorName),
|
||||||
|
@ -854,25 +903,37 @@ is_bridge_v2_type(_) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
list_and_transform_to_bridge_v1() ->
|
list_and_transform_to_bridge_v1() ->
|
||||||
list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2).
|
Bridges = list_with_lookup_fun(fun lookup_and_transform_to_bridge_v1/2),
|
||||||
|
[B || B <- Bridges, B =/= not_bridge_v1_compatible_error()].
|
||||||
|
|
||||||
lookup_and_transform_to_bridge_v1(Type, Name) ->
|
lookup_and_transform_to_bridge_v1(BridgeV1Type, Name) ->
|
||||||
|
case is_valid_bridge_v1(BridgeV1Type, Name) of
|
||||||
|
true ->
|
||||||
|
Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
case lookup(Type, Name) of
|
case lookup(Type, Name) of
|
||||||
{ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} ->
|
{ok, #{raw_config := #{<<"connector">> := ConnectorName}} = BridgeV2} ->
|
||||||
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
|
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(Type),
|
||||||
case emqx_connector:lookup(ConnectorType, ConnectorName) of
|
case emqx_connector:lookup(ConnectorType, ConnectorName) of
|
||||||
{ok, Connector} ->
|
{ok, Connector} ->
|
||||||
lookup_and_transform_to_bridge_v1_helper(
|
lookup_and_transform_to_bridge_v1_helper(
|
||||||
Type, BridgeV2, ConnectorType, Connector
|
BridgeV1Type, Name, Type, BridgeV2, ConnectorType, Connector
|
||||||
);
|
);
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end;
|
end;
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
|
end;
|
||||||
|
false ->
|
||||||
|
not_bridge_v1_compatible_error()
|
||||||
end.
|
end.
|
||||||
|
|
||||||
lookup_and_transform_to_bridge_v1_helper(BridgeV2Type, BridgeV2, ConnectorType, Connector) ->
|
not_bridge_v1_compatible_error() ->
|
||||||
|
{error, not_bridge_v1_compatible}.
|
||||||
|
|
||||||
|
lookup_and_transform_to_bridge_v1_helper(
|
||||||
|
BridgeV1Type, BridgeName, BridgeV2Type, BridgeV2, ConnectorType, Connector
|
||||||
|
) ->
|
||||||
ConnectorRawConfig1 = maps:get(raw_config, Connector),
|
ConnectorRawConfig1 = maps:get(raw_config, Connector),
|
||||||
ConnectorRawConfig2 = fill_defaults(
|
ConnectorRawConfig2 = fill_defaults(
|
||||||
ConnectorType,
|
ConnectorType,
|
||||||
|
@ -890,7 +951,29 @@ lookup_and_transform_to_bridge_v1_helper(BridgeV2Type, BridgeV2, ConnectorType,
|
||||||
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
BridgeV1Config1 = maps:remove(<<"connector">>, BridgeV2RawConfig2),
|
||||||
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
|
||||||
BridgeV1 = maps:put(raw_config, BridgeV1Config2, BridgeV2),
|
BridgeV1 = maps:put(raw_config, BridgeV1Config2, BridgeV2),
|
||||||
{ok, BridgeV1}.
|
BridgeV1_1 = maps:remove(status, BridgeV1),
|
||||||
|
BridgeV2Status = maps:get(status, BridgeV2, undefined),
|
||||||
|
ResourceData1 = maps:get(resource_data, BridgeV1_1, #{}),
|
||||||
|
%% Replace id in resouce data
|
||||||
|
BridgeV1Id = <<"bridge:", (bin(BridgeV1Type))/binary, ":", (bin(BridgeName))/binary>>,
|
||||||
|
ResourceData2 = maps:put(id, BridgeV1Id, ResourceData1),
|
||||||
|
ConnectorStatus = maps:get(status, ResourceData2, undefined),
|
||||||
|
case ConnectorStatus of
|
||||||
|
connected ->
|
||||||
|
case BridgeV2Status of
|
||||||
|
<<"connected">> ->
|
||||||
|
%% No need to modify the status
|
||||||
|
{ok, BridgeV1_1#{resource_data => ResourceData2}};
|
||||||
|
NotConnected ->
|
||||||
|
ResourceData3 = maps:put(status, connecting, ResourceData2),
|
||||||
|
ResourceData4 = maps:put(error, NotConnected, ResourceData3),
|
||||||
|
BridgeV1_2 = maps:put(resource_data, ResourceData4, BridgeV1_1),
|
||||||
|
{ok, BridgeV1_2}
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
%% No need to modify the status
|
||||||
|
{ok, BridgeV1_1}
|
||||||
|
end.
|
||||||
|
|
||||||
lookup_raw_conf(Type, Name) ->
|
lookup_raw_conf(Type, Name) ->
|
||||||
case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of
|
case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of
|
||||||
|
@ -901,6 +984,25 @@ lookup_raw_conf(Type, Name) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
|
split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
|
||||||
|
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
|
%% Check if the bridge v2 exists
|
||||||
|
case lookup_raw_conf(BridgeV2Type, BridgeName) of
|
||||||
|
{error, _} ->
|
||||||
|
%% If the bridge v2 does not exist, it is a valid bridge v1
|
||||||
|
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
|
||||||
|
_Conf ->
|
||||||
|
case is_valid_bridge_v1(BridgeV1Type, BridgeName) of
|
||||||
|
true ->
|
||||||
|
%% Remove and create if update operation
|
||||||
|
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, ignore_deps),
|
||||||
|
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf);
|
||||||
|
false ->
|
||||||
|
%% If the bridge v2 exists, it is not a valid bridge v1
|
||||||
|
{error, non_compatible_bridge_v2_exists}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
split_bridge_v1_config_and_create_helper(BridgeV1Type, BridgeName, RawConf) ->
|
||||||
#{
|
#{
|
||||||
connector_type := ConnectorType,
|
connector_type := ConnectorType,
|
||||||
connector_name := NewConnectorName,
|
connector_name := NewConnectorName,
|
||||||
|
@ -922,7 +1024,6 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
Error;
|
Error;
|
||||||
Error ->
|
Error ->
|
||||||
%% TODO log error
|
|
||||||
?SLOG(warning, #{
|
?SLOG(warning, #{
|
||||||
message =>
|
message =>
|
||||||
<<"Failed to remove connector after bridge creation failed">>,
|
<<"Failed to remove connector after bridge creation failed">>,
|
||||||
|
@ -938,17 +1039,17 @@ split_bridge_v1_config_and_create(BridgeV1Type, BridgeName, RawConf) ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
|
split_and_validate_bridge_v1_config(BridgeV1Type, BridgeName, RawConf) ->
|
||||||
%% Create fake global config for the transformation and then call
|
%% Create fake global config for the transformation and then call
|
||||||
%% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1
|
%% emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2/1
|
||||||
|
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeType),
|
ConnectorType = ?MODULE:bridge_v2_type_to_connector_type(BridgeV2Type),
|
||||||
%% Needed so name confligts will ba avoided
|
%% Needed so name confligts will ba avoided
|
||||||
CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
|
CurrentConnectorsConfig = emqx:get_raw_config([connectors], #{}),
|
||||||
FakeGlobalConfig = #{
|
FakeGlobalConfig = #{
|
||||||
<<"connectors">> => CurrentConnectorsConfig,
|
<<"connectors">> => CurrentConnectorsConfig,
|
||||||
<<"bridges">> => #{
|
<<"bridges">> => #{
|
||||||
bin(BridgeType) => #{
|
bin(BridgeV1Type) => #{
|
||||||
bin(BridgeName) => RawConf
|
bin(BridgeName) => RawConf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -960,7 +1061,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
|
||||||
emqx_utils_maps:deep_get(
|
emqx_utils_maps:deep_get(
|
||||||
[
|
[
|
||||||
bin(?ROOT_KEY),
|
bin(?ROOT_KEY),
|
||||||
bin(BridgeType),
|
bin(BridgeV2Type),
|
||||||
bin(BridgeName)
|
bin(BridgeName)
|
||||||
],
|
],
|
||||||
Output
|
Output
|
||||||
|
@ -1004,7 +1105,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
<<"bridges_v2">> => #{
|
<<"bridges_v2">> => #{
|
||||||
bin(BridgeType) => #{
|
bin(BridgeV2Type) => #{
|
||||||
bin(BridgeName) => NewBridgeV2RawConf
|
bin(BridgeName) => NewBridgeV2RawConf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1021,7 +1122,7 @@ split_and_validate_bridge_v1_config(BridgeType, BridgeName, RawConf) ->
|
||||||
connector_type => ConnectorType,
|
connector_type => ConnectorType,
|
||||||
connector_name => NewConnectorName,
|
connector_name => NewConnectorName,
|
||||||
connector_conf => NewConnectorRawConf,
|
connector_conf => NewConnectorRawConf,
|
||||||
bridge_v2_type => BridgeType,
|
bridge_v2_type => BridgeV2Type,
|
||||||
bridge_v2_name => BridgeName,
|
bridge_v2_name => BridgeName,
|
||||||
bridge_v2_conf => NewBridgeV2RawConf
|
bridge_v2_conf => NewBridgeV2RawConf
|
||||||
}
|
}
|
||||||
|
@ -1038,18 +1139,19 @@ bridge_v1_create_dry_run(BridgeType, RawConfig0) ->
|
||||||
connector_type := _ConnectorType,
|
connector_type := _ConnectorType,
|
||||||
connector_name := _NewConnectorName,
|
connector_name := _NewConnectorName,
|
||||||
connector_conf := ConnectorRawConf,
|
connector_conf := ConnectorRawConf,
|
||||||
bridge_v2_type := BridgeType,
|
bridge_v2_type := BridgeV2Type,
|
||||||
bridge_v2_name := _BridgeName,
|
bridge_v2_name := _BridgeName,
|
||||||
bridge_v2_conf := BridgeV2RawConf
|
bridge_v2_conf := BridgeV2RawConf
|
||||||
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
|
} = split_and_validate_bridge_v1_config(BridgeType, TmpName, RawConf),
|
||||||
create_dry_run_helper(BridgeType, ConnectorRawConf, BridgeV2RawConf).
|
create_dry_run_helper(BridgeV2Type, ConnectorRawConf, BridgeV2RawConf).
|
||||||
|
|
||||||
bridge_v1_check_deps_and_remove(BridgeType, BridgeName, RemoveDeps) ->
|
bridge_v1_check_deps_and_remove(BridgeV1Type, BridgeName, RemoveDeps) ->
|
||||||
|
BridgeV2Type = ?MODULE:bridge_v1_type_to_bridge_v2_type(BridgeV1Type),
|
||||||
bridge_v1_check_deps_and_remove(
|
bridge_v1_check_deps_and_remove(
|
||||||
BridgeType,
|
BridgeV2Type,
|
||||||
BridgeName,
|
BridgeName,
|
||||||
RemoveDeps,
|
RemoveDeps,
|
||||||
lookup_raw_conf(BridgeType, BridgeName)
|
lookup_raw_conf(BridgeV2Type, BridgeName)
|
||||||
).
|
).
|
||||||
|
|
||||||
bridge_v1_check_deps_and_remove(
|
bridge_v1_check_deps_and_remove(
|
||||||
|
|
|
@ -258,7 +258,8 @@ t_is_valid_bridge_v1(_) ->
|
||||||
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge),
|
||||||
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
|
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge_2),
|
||||||
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
|
{ok, _} = emqx_bridge_v2:remove(bridge_type(), my_test_bridge_2),
|
||||||
false = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
|
%% Non existing bridge is a valid Bridge V1
|
||||||
|
true = emqx_bridge_v2:is_valid_bridge_v1(bridge_v1_type, my_test_bridge),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_manual_health_check(_) ->
|
t_manual_health_check(_) ->
|
||||||
|
@ -599,7 +600,7 @@ t_remove_single_connector_being_referenced_without_active_channels(_Config) ->
|
||||||
?assertMatch({ok, _}, emqx_connector:remove(con_type(), con_name())),
|
?assertMatch({ok, _}, emqx_connector:remove(con_type(), con_name())),
|
||||||
%% we no longer have connector data if this happens...
|
%% we no longer have connector data if this happens...
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{resource_data := undefined}},
|
{ok, #{resource_data := #{}}},
|
||||||
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
||||||
),
|
),
|
||||||
ok
|
ok
|
||||||
|
@ -638,7 +639,7 @@ t_remove_multiple_connectors_being_referenced_without_channels(_Config) ->
|
||||||
),
|
),
|
||||||
%% we no longer have connector data if this happens...
|
%% we no longer have connector data if this happens...
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, #{resource_data := undefined}},
|
{ok, #{resource_data := #{}}},
|
||||||
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
emqx_bridge_v2:lookup(bridge_type(), BridgeName)
|
||||||
),
|
),
|
||||||
ok
|
ok
|
||||||
|
|
Loading…
Reference in New Issue