From 9a6901987f5c857bfbfb2042d998daffa5ffc999 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 28 Nov 2022 22:22:20 +0100 Subject: [PATCH] feat: add mqtt bridge config upgrade converter --- apps/emqx_bridge/src/emqx_bridge.erl | 8 +- .../src/schema/emqx_bridge_mqtt_config.erl | 116 ++++++++++++++++++ .../src/schema/emqx_bridge_schema.erl | 5 +- .../src/mqtt/emqx_connector_mqtt_schema.erl | 1 + 4 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3aff30859..44028e900 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -213,6 +213,7 @@ lookup(Id) -> lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). + lookup(Type, Name, RawConf) -> case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of {error, not_found} -> @@ -222,10 +223,15 @@ lookup(Type, Name, RawConf) -> type => Type, name => Name, resource_data => Data, - raw_config => RawConf + raw_config => maybe_upgrade(Type, RawConf) }} end. +maybe_upgrade(mqtt, Config) -> + emqx_bridge_mqtt_config:maybe_upgrade(Config); +maybe_upgrade(_Other, Config) -> + Config. + disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl new file mode 100644 index 000000000..2e4c17a25 --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl @@ -0,0 +1,116 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_config). + +-export([ + upgrade_pre_ee/1, + maybe_upgrade/1 +]). + +upgrade_pre_ee(undefined) -> + undefined; +upgrade_pre_ee(Conf0) when is_map(Conf0) -> + maps:from_list(upgrade_pre_ee(maps:to_list(Conf0))); +upgrade_pre_ee([]) -> + []; +upgrade_pre_ee([{Name, Config} | Bridges]) -> + [{Name, maybe_upgrade(Config)} | upgrade_pre_ee(Bridges)]. + +maybe_upgrade(#{<<"connector">> := _} = Config0) -> + Config1 = up(Config0), + Config = lists:map(fun binary_key/1, Config1), + maps:from_list(Config); +maybe_upgrade(NewVersion) -> + NewVersion. + +binary_key({K, V}) -> + {atom_to_binary(K, utf8), V}. + +up(#{<<"connector">> := Connector} = Config) -> + Cn = fun(Key0, Default) -> + Key = atom_to_binary(Key0, utf8), + {Key0, maps:get(Key, Connector, Default)} + end, + Direction = + case maps:get(<<"direction">>, Config) of + <<"egress">> -> + {egress, egress(Config)}; + <<"ingress">> -> + {ingress, ingress(Config)} + end, + Enable = maps:get(<<"enable">>, Config, true), + [ + Cn(bridge_mode, false), + Cn(username, <<>>), + Cn(password, <<>>), + Cn(clean_start, true), + Cn(keepalive, <<"60s">>), + Cn(mode, <<"cluster_shareload">>), + Cn(proto_ver, <<"v4">>), + Cn(server, undefined), + Cn(retry_interval, <<"15s">>), + Cn(reconnect_interval, <<"15s">>), + Cn(ssl, default_ssl()), + {enable, Enable}, + {resource_opts, default_resource_opts()}, + Direction + ]. + +default_ssl() -> + #{ + <<"enable">> => false, + <<"verify">> => <<"verify_peer">> + }. + +default_resource_opts() -> + #{ + <<"async_inflight_window">> => 100, + <<"auto_restart_interval">> => <<"60s">>, + <<"enable_queue">> => false, + <<"health_check_interval">> => <<"15s">>, + <<"max_queue_bytes">> => <<"1GB">>, + <<"query_mode">> => <<"sync">>, + <<"worker_pool_size">> => 16 + }. + +egress(Config) -> + % <<"local">> % the old version has no 'local' config for egress + #{ + <<"remote">> => + #{ + <<"topic">> => maps:get(<<"remote_topic">>, Config), + <<"qos">> => maps:get(<<"remote_qos">>, Config), + <<"retain">> => maps:get(<<"retain">>, Config), + <<"payload">> => maps:get(<<"payload">>, Config) + } + }. + +ingress(Config) -> + #{ + <<"remote">> => + #{ + <<"qos">> => maps:get(<<"remote_qos">>, Config), + <<"topic">> => maps:get(<<"remote_topic">>, Config) + }, + <<"local">> => + #{ + <<"payload">> => maps:get(<<"payload">>, Config), + <<"qos">> => maps:get(<<"local_qos">>, Config), + <<"retain">> => maps:get(<<"retain">>, Config, false) + %% <<"topic">> % th old version has no local topic for ingress + } + }. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 8bfc1c78a..31fc3bcc1 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -101,7 +101,10 @@ fields(bridges) -> {mqtt, mk( hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")), - #{desc => ?DESC("bridges_mqtt")} + #{ + desc => ?DESC("bridges_mqtt"), + converter => fun emqx_bridge_mqtt_config:upgrade_pre_ee/1 + } )} ] ++ ee_fields_bridges(); fields("metrics") -> diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index d77859dd7..50f5ee5df 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -109,6 +109,7 @@ fields("server_configs") -> binary(), #{ format => <<"password">>, + sensitive => true, desc => ?DESC("password") } )},