diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index e50264693..d13fda30a 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -29,7 +29,7 @@ {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, - {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}, + {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 3aff30859..44028e900 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -213,6 +213,7 @@ lookup(Id) -> lookup(Type, Name) -> RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), lookup(Type, Name, RawConf). + lookup(Type, Name, RawConf) -> case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of {error, not_found} -> @@ -222,10 +223,15 @@ lookup(Type, Name, RawConf) -> type => Type, name => Name, resource_data => Data, - raw_config => RawConf + raw_config => maybe_upgrade(Type, RawConf) }} end. +maybe_upgrade(mqtt, Config) -> + emqx_bridge_mqtt_config:maybe_upgrade(Config); +maybe_upgrade(_Other, Config) -> + Config. + disable_enable(Action, BridgeType, BridgeName) when Action =:= disable; Action =:= enable -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl new file mode 100644 index 000000000..997337c9d --- /dev/null +++ b/apps/emqx_bridge/src/schema/emqx_bridge_mqtt_config.erl @@ -0,0 +1,118 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This module was created to convert old version (from v5.0.0 to v5.0.11) +%% mqtt connector configs to newer version (developed for enterprise edition). +-module(emqx_bridge_mqtt_config). + +-export([ + upgrade_pre_ee/1, + maybe_upgrade/1 +]). + +upgrade_pre_ee(undefined) -> + undefined; +upgrade_pre_ee(Conf0) when is_map(Conf0) -> + maps:from_list(upgrade_pre_ee(maps:to_list(Conf0))); +upgrade_pre_ee([]) -> + []; +upgrade_pre_ee([{Name, Config} | Bridges]) -> + [{Name, maybe_upgrade(Config)} | upgrade_pre_ee(Bridges)]. + +maybe_upgrade(#{<<"connector">> := _} = Config0) -> + Config1 = up(Config0), + Config = lists:map(fun binary_key/1, Config1), + maps:from_list(Config); +maybe_upgrade(NewVersion) -> + NewVersion. + +binary_key({K, V}) -> + {atom_to_binary(K, utf8), V}. + +up(#{<<"connector">> := Connector} = Config) -> + Cn = fun(Key0, Default) -> + Key = atom_to_binary(Key0, utf8), + {Key0, maps:get(Key, Connector, Default)} + end, + Direction = + case maps:get(<<"direction">>, Config) of + <<"egress">> -> + {egress, egress(Config)}; + <<"ingress">> -> + {ingress, ingress(Config)} + end, + Enable = maps:get(<<"enable">>, Config, true), + [ + Cn(bridge_mode, false), + Cn(username, <<>>), + Cn(password, <<>>), + Cn(clean_start, true), + Cn(keepalive, <<"60s">>), + Cn(mode, <<"cluster_shareload">>), + Cn(proto_ver, <<"v4">>), + Cn(server, undefined), + Cn(retry_interval, <<"15s">>), + Cn(reconnect_interval, <<"15s">>), + Cn(ssl, default_ssl()), + {enable, Enable}, + {resource_opts, default_resource_opts()}, + Direction + ]. + +default_ssl() -> + #{ + <<"enable">> => false, + <<"verify">> => <<"verify_peer">> + }. + +default_resource_opts() -> + #{ + <<"async_inflight_window">> => 100, + <<"auto_restart_interval">> => <<"60s">>, + <<"enable_queue">> => false, + <<"health_check_interval">> => <<"15s">>, + <<"max_queue_bytes">> => <<"1GB">>, + <<"query_mode">> => <<"sync">>, + <<"worker_pool_size">> => 16 + }. + +egress(Config) -> + % <<"local">> % the old version has no 'local' config for egress + #{ + <<"remote">> => + #{ + <<"topic">> => maps:get(<<"remote_topic">>, Config), + <<"qos">> => maps:get(<<"remote_qos">>, Config), + <<"retain">> => maps:get(<<"retain">>, Config), + <<"payload">> => maps:get(<<"payload">>, Config) + } + }. + +ingress(Config) -> + #{ + <<"remote">> => + #{ + <<"qos">> => maps:get(<<"remote_qos">>, Config), + <<"topic">> => maps:get(<<"remote_topic">>, Config) + }, + <<"local">> => + #{ + <<"payload">> => maps:get(<<"payload">>, Config), + <<"qos">> => maps:get(<<"local_qos">>, Config), + <<"retain">> => maps:get(<<"retain">>, Config, false) + %% <<"topic">> % th old version has no local topic for ingress + } + }. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl index 8bfc1c78a..756a8347d 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_schema.erl @@ -96,12 +96,19 @@ fields(bridges) -> {webhook, mk( hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")), - #{desc => ?DESC("bridges_webhook")} + #{ + desc => ?DESC("bridges_webhook"), + required => false + } )}, {mqtt, mk( hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")), - #{desc => ?DESC("bridges_mqtt")} + #{ + desc => ?DESC("bridges_mqtt"), + required => false, + converter => fun emqx_bridge_mqtt_config:upgrade_pre_ee/1 + } )} ] ++ ee_fields_bridges(); fields("metrics") -> diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl index 0f692f195..d270fc91e 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_webhook_schema.erl @@ -69,7 +69,10 @@ request_config() -> {local_topic, mk( binary(), - #{desc => ?DESC("config_local_topic")} + #{ + desc => ?DESC("config_local_topic"), + required => false + } )}, {method, mk( diff --git a/apps/emqx_bridge/test/emqx_bridge_mqtt_config_tests.erl b/apps/emqx_bridge/test/emqx_bridge_mqtt_config_tests.erl new file mode 100644 index 000000000..fa3fff7d9 --- /dev/null +++ b/apps/emqx_bridge/test/emqx_bridge_mqtt_config_tests.erl @@ -0,0 +1,229 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_mqtt_config_tests). + +-include_lib("eunit/include/eunit.hrl"). + +empty_config_test() -> + Conf1 = #{<<"bridges">> => #{}}, + Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}}, + ?assertEqual(Conf1, check(Conf1)), + ?assertEqual(Conf2, check(Conf2)), + ok. + +%% ensure webhook config can be checked +webhook_config_test() -> + Conf = parse(webhook_v5011_hocon()), + ?assertMatch( + #{ + <<"bridges">> := + #{ + <<"webhook">> := #{ + <<"the_name">> := + #{ + <<"method">> := get, + <<"body">> := <<"${payload}">> + } + } + } + }, + check(Conf) + ), + ok. + +up(#{<<"bridges">> := Bridges0} = Conf0) -> + Bridges = up(Bridges0), + Conf0#{<<"bridges">> := Bridges}; +up(#{<<"mqtt">> := MqttBridges0} = Bridges) -> + MqttBridges = emqx_bridge_mqtt_config:upgrade_pre_ee(MqttBridges0), + Bridges#{<<"mqtt">> := MqttBridges}. + +parse(HOCON) -> + {ok, Conf} = hocon:binary(HOCON), + Conf. + +mqtt_config_test_() -> + Conf0 = mqtt_v5011_hocon(), + Conf1 = mqtt_v5011_full_hocon(), + [ + {Tag, fun() -> + Parsed = parse(Conf), + Upgraded = up(Parsed), + Checked = check(Upgraded), + assert_upgraded(Checked) + end} + || {Tag, Conf} <- [{"minimum", Conf0}, {"full", Conf1}] + ]. + +assert_upgraded(#{<<"bridges">> := Bridges}) -> + assert_upgraded(Bridges); +assert_upgraded(#{<<"mqtt">> := Mqtt}) -> + assert_upgraded(Mqtt); +assert_upgraded(#{<<"bridge_one">> := Map}) -> + assert_upgraded1(Map); +assert_upgraded(#{<<"bridge_two">> := Map}) -> + assert_upgraded1(Map). + +assert_upgraded1(Map) -> + ?assertNot(maps:is_key(<<"connector">>, Map)), + ?assertNot(maps:is_key(<<"direction">>, Map)), + ?assert(maps:is_key(<<"server">>, Map)), + ?assert(maps:is_key(<<"ssl">>, Map)). + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%% erlfmt-ignore +%% this is config generated from v5.0.11 +webhook_v5011_hocon() -> +""" +bridges{ + webhook { + the_name{ + body = \"${payload}\" + connect_timeout = \"5s\" + enable_pipelining = 100 + headers {\"content-type\" = \"application/json\"} + max_retries = 3 + method = \"get\" + pool_size = 4 + request_timeout = \"5s\" + ssl {enable = false, verify = \"verify_peer\"} + url = \"http://localhost:8080\" + } + } +} +""". + +%% erlfmt-ignore +%% this is a generated from v5.0.11 +mqtt_v5011_hocon() -> +""" +bridges { + mqtt { + bridge_one { + connector { + bridge_mode = false + clean_start = true + keepalive = \"60s\" + mode = cluster_shareload + proto_ver = \"v4\" + server = \"localhost:1883\" + ssl {enable = false, verify = \"verify_peer\"} + } + direction = egress + enable = true + payload = \"${payload}\" + remote_qos = 1 + remote_topic = \"tttttttttt\" + retain = false + } + bridge_two { + connector { + bridge_mode = false + clean_start = true + keepalive = \"60s\" + mode = \"cluster_shareload\" + proto_ver = \"v4\" + server = \"localhost:1883\" + ssl {enable = false, verify = \"verify_peer\"} + } + direction = ingress + enable = true + local_qos = 1 + payload = \"${payload}\" + remote_qos = 1 + remote_topic = \"tttttttt/#\" + retain = false + } + } +} +""". + +%% erlfmt-ignore +%% a more complete version +mqtt_v5011_full_hocon() -> +""" +bridges { + mqtt { + bridge_one { + connector { + bridge_mode = false + clean_start = true + keepalive = \"60s\" + max_inflight = 32 + mode = \"cluster_shareload\" + password = \"\" + proto_ver = \"v5\" + reconnect_interval = \"15s\" + replayq {offload = false, seg_bytes = \"100MB\"} + retry_interval = \"12s\" + server = \"localhost:1883\" + ssl { + ciphers = \"\" + depth = 10 + enable = false + reuse_sessions = true + secure_renegotiate = true + user_lookup_fun = \"emqx_tls_psk:lookup\" + verify = \"verify_peer\" + versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"] + } + username = \"\" + } + direction = \"ingress\" + enable = true + local_qos = 1 + payload = \"${payload}\" + remote_qos = 1 + remote_topic = \"tttt/a\" + retain = false + } + bridge_two { + connector { + bridge_mode = false + clean_start = true + keepalive = \"60s\" + max_inflight = 32 + mode = \"cluster_shareload\" + password = \"\" + proto_ver = \"v4\" + reconnect_interval = \"15s\" + replayq {offload = false, seg_bytes = \"100MB\"} + retry_interval = \"44s\" + server = \"localhost:1883\" + ssl { + ciphers = \"\" + depth = 10 + enable = false + reuse_sessions = true + secure_renegotiate = true + user_lookup_fun = \"emqx_tls_psk:lookup\" + verify = verify_peer + versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"] + } + username = \"\" + } + direction = egress + enable = true + payload = \"${payload.x}\" + remote_qos = 1 + remote_topic = \"remotetopic/1\" + retain = false + } + } +} +""". diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl index d77859dd7..93bd846e4 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_schema.erl @@ -42,17 +42,17 @@ fields("config") -> [ {"ingress", mk( - hoconsc:union([none, ref(?MODULE, "ingress")]), + ref(?MODULE, "ingress"), #{ - default => undefined, + required => {false, recursively}, desc => ?DESC("ingress_desc") } )}, {"egress", mk( - hoconsc:union([none, ref(?MODULE, "egress")]), + ref(?MODULE, "egress"), #{ - default => undefined, + required => {false, recursively}, desc => ?DESC("egress_desc") } )} @@ -109,6 +109,7 @@ fields("server_configs") -> binary(), #{ format => <<"password">>, + sensitive => true, desc => ?DESC("password") } )}, @@ -146,7 +147,10 @@ fields("ingress") -> {"local", mk( ref(?MODULE, "ingress_local"), - #{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")} + #{ + desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"), + is_required => false + } )} ]; fields("ingress_remote") -> @@ -176,7 +180,8 @@ fields("ingress_local") -> binary(), #{ validator => fun emqx_schema:non_empty_string/1, - desc => ?DESC("ingress_local_topic") + desc => ?DESC("ingress_local_topic"), + required => false } )}, {qos, @@ -209,12 +214,18 @@ fields("egress") -> {"local", mk( ref(?MODULE, "egress_local"), - #{desc => ?DESC(emqx_connector_mqtt_schema, "egress_local")} + #{ + desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"), + required => false + } )}, {"remote", mk( ref(?MODULE, "egress_remote"), - #{desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote")} + #{ + desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"), + required => true + } )} ]; fields("egress_local") -> @@ -224,6 +235,7 @@ fields("egress_local") -> binary(), #{ desc => ?DESC("egress_local_topic"), + required => false, validator => fun emqx_schema:non_empty_string/1 } )} diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index bfd1c957e..9119b052d 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,5 @@ {erl_opts, [debug_info]}. -{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} +{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}} , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 96efee066..e0d362f5e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -62,17 +62,26 @@ fields(bridges) -> {kafka, mk( hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")), - #{desc => <<"EMQX Enterprise Config">>} + #{ + desc => <<"Kafka Bridge Config">>, + required => false + } )}, {hstreamdb, mk( hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), - #{desc => <<"EMQX Enterprise Config">>} + #{ + desc => <<"HStreamDB Bridge Config">>, + required => false + } )}, {mysql, mk( hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), - #{desc => <<"EMQX Enterprise Config">>} + #{ + desc => <<"MySQL Bridge Config">>, + required => false + } )} ] ++ mongodb_structs() ++ influxdb_structs(). @@ -81,7 +90,10 @@ mongodb_structs() -> {Type, mk( hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)), - #{desc => <<"EMQX Enterprise Config">>} + #{ + desc => <<"MongoDB Bridge Config">>, + required => false + } )} || Type <- [mongodb_rs, mongodb_sharded, mongodb_single] ]. @@ -91,7 +103,10 @@ influxdb_structs() -> {Protocol, mk( hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)), - #{desc => <<"EMQX Enterprise Config">>} + #{ + desc => <<"InfluxDB Bridge Config">>, + required => false + } )} || Protocol <- [ %% influxdb_udp, diff --git a/mix.exs b/mix.exs index c7d9ae856..a3c18842c 100644 --- a/mix.exs +++ b/mix.exs @@ -67,7 +67,7 @@ defmodule EMQXUmbrella.MixProject do # in conflict by emqtt and hocon {:getopt, "1.0.2", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, - {:hocon, github: "emqx/hocon", tag: "0.30.0", override: true}, + {:hocon, github: "emqx/hocon", tag: "0.31.2", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true}, {:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, diff --git a/rebar.config b/rebar.config index 369da655a..687f49cea 100644 --- a/rebar.config +++ b/rebar.config @@ -67,7 +67,7 @@ , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {getopt, "1.0.2"} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} - , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} + , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}