feat: started to make bridge_v2 compatible with old style bridges

This commit is contained in:
Kjell Winblad 2023-10-02 17:22:48 +02:00 committed by Zaiming (Stone) Shi
parent bc6e1da2fb
commit 5374d35be3
6 changed files with 114 additions and 18 deletions

View File

@ -279,6 +279,7 @@ post_config_update([?ROOT_KEY], _Req, NewConf, OldConf, _AppEnv) ->
Result. Result.
list() -> list() ->
% OldStyleBridges =
maps:fold( maps:fold(
fun(Type, NameAndConf, Bridges) -> fun(Type, NameAndConf, Bridges) ->
maps:fold( maps:fold(
@ -295,14 +296,20 @@ list() ->
[], [],
emqx:get_raw_config([bridges], #{}) emqx:get_raw_config([bridges], #{})
). ).
%%BridgeV2Bridges = emqx_bridge_v2:list().
lookup(Id) -> lookup(Id) ->
{Type, Name} = emqx_bridge_resource:parse_bridge_id(Id), {Type, Name} = emqx_bridge_resource:parse_bridge_id(Id),
lookup(Type, Name). lookup(Type, Name).
lookup(Type, Name) -> lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), case emqx_bridge_v2:is_bridge_v2_type(Type) of
lookup(Type, Name, RawConf). true ->
emqx_bridge_v2:lookup_and_transform_to_bridge_v1(Type, Name);
false ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf)
end.
lookup(Type, Name, RawConf) -> lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of

View File

@ -837,7 +837,14 @@ format_resource(
}, },
Node Node
) -> ) ->
RawConfFull = fill_defaults(Type, RawConf), RawConfFull =
case emqx_bridge_v2:is_bridge_v2_type(Type) of
true ->
%% The defaults are already filled in
RawConf;
false ->
fill_defaults(Type, RawConf)
end,
redact( redact(
maps:merge( maps:merge(
RawConfFull#{ RawConfFull#{

View File

@ -38,6 +38,12 @@
get_channels_for_connector/1 get_channels_for_connector/1
]). ]).
%% Compatibility API
-export([
lookup_and_transform_to_bridge_v1/2
]).
%% CRUD API %% CRUD API
-export([ -export([
@ -183,7 +189,7 @@ get_query_mode(BridgeV2Type, Config) ->
emqx_resource:query_mode(ResourceType, Config, CreationOpts). emqx_resource:query_mode(ResourceType, Config, CreationOpts).
send_message(BridgeType, BridgeName, Message, QueryOpts0) -> send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
case lookup(BridgeType, BridgeName) of case lookup_raw_conf(BridgeType, BridgeName) of
#{enable := true} = Config -> #{enable := true} = Config ->
do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config); do_send_msg_with_enabled_config(BridgeType, BridgeName, Message, QueryOpts0, Config);
#{enable := false} -> #{enable := false} ->
@ -193,7 +199,7 @@ send_message(BridgeType, BridgeName, Message, QueryOpts0) ->
end. end.
health_check(BridgeType, BridgeName) -> health_check(BridgeType, BridgeName) ->
case lookup(BridgeType, BridgeName) of case lookup_raw_conf(BridgeType, BridgeName) of
#{ #{
enable := true, enable := true,
connector := ConnectorName connector := ConnectorName
@ -267,7 +273,7 @@ parse_id(Id) ->
end. end.
id(BridgeType, BridgeName) -> id(BridgeType, BridgeName) ->
case lookup(BridgeType, BridgeName) of case lookup_raw_conf(BridgeType, BridgeName) of
#{connector := ConnectorName} -> #{connector := ConnectorName} ->
id(BridgeType, BridgeName, ConnectorName); id(BridgeType, BridgeName, ConnectorName);
Error -> Error ->
@ -279,6 +285,8 @@ id(BridgeType, BridgeName, ConnectorName) ->
<<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:", <<"bridge_v2:", (bin(BridgeType))/binary, ":", (bin(BridgeName))/binary, ":connector:",
(bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>. (bin(ConnectorType))/binary, ":", (bin(ConnectorName))/binary>>.
bridge_v2_type_to_connector_type(Bin) when is_binary(Bin) ->
bridge_v2_type_to_connector_type(binary_to_existing_atom(Bin));
bridge_v2_type_to_connector_type(kafka) -> bridge_v2_type_to_connector_type(kafka) ->
kafka. kafka.
@ -306,13 +314,13 @@ list() ->
maps:fold( maps:fold(
fun(Type, NameAndConf, Bridges) -> fun(Type, NameAndConf, Bridges) ->
maps:fold( maps:fold(
fun(Name, RawConf, Acc) -> fun(Name, _RawConf, Acc) ->
[ [
#{ begin
type => Type, {ok, BridgeInfo} =
name => Name, lookup(Type, Name),
raw_config => RawConf BridgeInfo
} end
| Acc | Acc
] ]
end, end,
@ -329,6 +337,66 @@ lookup(Id) ->
lookup(Type, Name). lookup(Type, Name).
lookup(Type, Name) -> lookup(Type, Name) ->
case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of
not_found ->
{error, bridge_not_found};
#{connector := BridgeConnector} = RawConf ->
ConnectorId = emqx_connector_resource:resource_id(
bridge_v2_type_to_connector_type(Type), BridgeConnector
),
InstanceData =
case emqx_resource:get_instance(ConnectorId) of
{error, not_found} ->
%% TODO should we throw an error here (this should not happen)?
{error, not_found};
{ok, _, Data} ->
Data
end,
{ok, #{
type => Type,
name => Name,
raw_config => RawConf,
resource_data => InstanceData
}}
end.
lookup_and_transform_to_bridge_v1(Type, Name) ->
case lookup(Type, Name) of
{ok, #{raw_config := #{connector := ConnectorName}} = BridgeV2} ->
ConnectorType = bridge_v2_type_to_connector_type(Type),
case emqx_connector:lookup(ConnectorType, ConnectorName) of
{ok, Connector} ->
lookup_and_transform_to_bridge_v1_helper(
Type, BridgeV2, ConnectorType, Connector
);
Error ->
Error
end;
Error ->
Error
end.
lookup_and_transform_to_bridge_v1_helper(BridgeV2Type, BridgeV2, ConnectorType, Connector) ->
ConnectorRawConfig1 = maps:get(raw_config, Connector),
ConnectorRawConfig2 = fill_defaults(
ConnectorType,
ConnectorRawConfig1,
<<"connectors">>,
emqx_connector_schema
),
BridgeV2RawConfig1 = maps:get(raw_config, BridgeV2),
BridgeV2RawConfig2 = fill_defaults(
BridgeV2Type,
BridgeV2RawConfig1,
<<"bridges_v2">>,
emqx_bridge_v2_schema
),
BridgeV1Config1 = maps:remove(connector, BridgeV2RawConfig2),
BridgeV1Config2 = maps:merge(BridgeV1Config1, ConnectorRawConfig2),
BridgeV1 = maps:put(raw_config, BridgeV1Config2, BridgeV2),
{ok, BridgeV1}.
lookup_raw_conf(Type, Name) ->
case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of case emqx:get_config([?ROOT_KEY, Type, Name], not_found) of
not_found -> not_found ->
{error, bridge_not_found}; {error, bridge_not_found};
@ -482,3 +550,17 @@ is_bridge_v2_installed_in_connector_state(Tag, State) when is_map(State) ->
maps:is_key(Tag, BridgeV2s); maps:is_key(Tag, BridgeV2s);
is_bridge_v2_installed_in_connector_state(_Tag, _State) -> is_bridge_v2_installed_in_connector_state(_Tag, _State) ->
false. false.
fill_defaults(Type, RawConf, TopLevelConf, SchemaModule) ->
PackedConf = pack_bridge_conf(Type, RawConf, TopLevelConf),
FullConf = emqx_config:fill_defaults(SchemaModule, PackedConf, #{}),
unpack_bridge_conf(Type, FullConf, TopLevelConf).
pack_bridge_conf(Type, RawConf, TopLevelConf) ->
#{TopLevelConf => #{bin(Type) => #{<<"foo">> => RawConf}}}.
unpack_bridge_conf(Type, PackedConf, TopLevelConf) ->
TypeBin = bin(Type),
#{TopLevelConf := Bridges} = PackedConf,
#{<<"foo">> := RawConf} = maps:get(TypeBin, Bridges),
RawConf.

View File

@ -1,7 +1,7 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_action_enterprise). -module(emqx_bridge_v2_enterprise).
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).

View File

@ -13,7 +13,7 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_action_schema). -module(emqx_bridge_v2_schema).
-include_lib("typerefl/include/types.hrl"). -include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
@ -28,10 +28,10 @@
enterprise_fields_actions() -> enterprise_fields_actions() ->
%% We *must* do this to ensure the module is really loaded, especially when we use %% We *must* do this to ensure the module is really loaded, especially when we use
%% `call_hocon' from `nodetool' to generate initial configurations. %% `call_hocon' from `nodetool' to generate initial configurations.
_ = emqx_action_enterprise:module_info(), _ = emqx_bridge_v2_enterprise:module_info(),
case erlang:function_exported(emqx_action_enterprise, fields, 1) of case erlang:function_exported(emqx_bridge_v2_enterprise, fields, 1) of
true -> true ->
emqx_action_enterprise:fields(bridges_v2); emqx_bridge_v2_enterprise:fields(bridges_v2);
false -> false ->
[] []
end. end.

View File

@ -54,7 +54,7 @@
-define(MERGED_CONFIGS, [ -define(MERGED_CONFIGS, [
emqx_bridge_schema, emqx_bridge_schema,
emqx_connector_schema, emqx_connector_schema,
emqx_action_schema, emqx_bridge_v2_schema,
emqx_retainer_schema, emqx_retainer_schema,
emqx_authn_schema, emqx_authn_schema,
emqx_authz_schema, emqx_authz_schema,