Merge pull request #9463 from zmstone/1128-fix-upgrade-pre-ee-confg

1128 fix upgrade pre ee confg
This commit is contained in:
Zaiming (Stone) Shi 2022-12-01 21:59:39 +01:00 committed by GitHub
commit d45845bc53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 411 additions and 21 deletions

View File

@ -29,7 +29,7 @@
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.13.6"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}

View File

@ -213,6 +213,7 @@ lookup(Id) ->
lookup(Type, Name) -> lookup(Type, Name) ->
RawConf = emqx:get_raw_config([bridges, Type, Name], #{}), RawConf = emqx:get_raw_config([bridges, Type, Name], #{}),
lookup(Type, Name, RawConf). lookup(Type, Name, RawConf).
lookup(Type, Name, RawConf) -> lookup(Type, Name, RawConf) ->
case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of case emqx_resource:get_instance(emqx_bridge_resource:resource_id(Type, Name)) of
{error, not_found} -> {error, not_found} ->
@ -222,10 +223,15 @@ lookup(Type, Name, RawConf) ->
type => Type, type => Type,
name => Name, name => Name,
resource_data => Data, resource_data => Data,
raw_config => RawConf raw_config => maybe_upgrade(Type, RawConf)
}} }}
end. end.
maybe_upgrade(mqtt, Config) ->
emqx_bridge_mqtt_config:maybe_upgrade(Config);
maybe_upgrade(_Other, Config) ->
Config.
disable_enable(Action, BridgeType, BridgeName) when disable_enable(Action, BridgeType, BridgeName) when
Action =:= disable; Action =:= enable Action =:= disable; Action =:= enable
-> ->

View File

@ -0,0 +1,118 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This module was created to convert old version (from v5.0.0 to v5.0.11)
%% mqtt connector configs to newer version (developed for enterprise edition).
-module(emqx_bridge_mqtt_config).
-export([
upgrade_pre_ee/1,
maybe_upgrade/1
]).
upgrade_pre_ee(undefined) ->
undefined;
upgrade_pre_ee(Conf0) when is_map(Conf0) ->
maps:from_list(upgrade_pre_ee(maps:to_list(Conf0)));
upgrade_pre_ee([]) ->
[];
upgrade_pre_ee([{Name, Config} | Bridges]) ->
[{Name, maybe_upgrade(Config)} | upgrade_pre_ee(Bridges)].
maybe_upgrade(#{<<"connector">> := _} = Config0) ->
Config1 = up(Config0),
Config = lists:map(fun binary_key/1, Config1),
maps:from_list(Config);
maybe_upgrade(NewVersion) ->
NewVersion.
binary_key({K, V}) ->
{atom_to_binary(K, utf8), V}.
up(#{<<"connector">> := Connector} = Config) ->
Cn = fun(Key0, Default) ->
Key = atom_to_binary(Key0, utf8),
{Key0, maps:get(Key, Connector, Default)}
end,
Direction =
case maps:get(<<"direction">>, Config) of
<<"egress">> ->
{egress, egress(Config)};
<<"ingress">> ->
{ingress, ingress(Config)}
end,
Enable = maps:get(<<"enable">>, Config, true),
[
Cn(bridge_mode, false),
Cn(username, <<>>),
Cn(password, <<>>),
Cn(clean_start, true),
Cn(keepalive, <<"60s">>),
Cn(mode, <<"cluster_shareload">>),
Cn(proto_ver, <<"v4">>),
Cn(server, undefined),
Cn(retry_interval, <<"15s">>),
Cn(reconnect_interval, <<"15s">>),
Cn(ssl, default_ssl()),
{enable, Enable},
{resource_opts, default_resource_opts()},
Direction
].
default_ssl() ->
#{
<<"enable">> => false,
<<"verify">> => <<"verify_peer">>
}.
default_resource_opts() ->
#{
<<"async_inflight_window">> => 100,
<<"auto_restart_interval">> => <<"60s">>,
<<"enable_queue">> => false,
<<"health_check_interval">> => <<"15s">>,
<<"max_queue_bytes">> => <<"1GB">>,
<<"query_mode">> => <<"sync">>,
<<"worker_pool_size">> => 16
}.
egress(Config) ->
% <<"local">> % the old version has no 'local' config for egress
#{
<<"remote">> =>
#{
<<"topic">> => maps:get(<<"remote_topic">>, Config),
<<"qos">> => maps:get(<<"remote_qos">>, Config),
<<"retain">> => maps:get(<<"retain">>, Config),
<<"payload">> => maps:get(<<"payload">>, Config)
}
}.
ingress(Config) ->
#{
<<"remote">> =>
#{
<<"qos">> => maps:get(<<"remote_qos">>, Config),
<<"topic">> => maps:get(<<"remote_topic">>, Config)
},
<<"local">> =>
#{
<<"payload">> => maps:get(<<"payload">>, Config),
<<"qos">> => maps:get(<<"local_qos">>, Config),
<<"retain">> => maps:get(<<"retain">>, Config, false)
%% <<"topic">> % th old version has no local topic for ingress
}
}.

View File

@ -96,12 +96,19 @@ fields(bridges) ->
{webhook, {webhook,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")), hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
#{desc => ?DESC("bridges_webhook")} #{
desc => ?DESC("bridges_webhook"),
required => false
}
)}, )},
{mqtt, {mqtt,
mk( mk(
hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")), hoconsc:map(name, ref(emqx_bridge_mqtt_schema, "config")),
#{desc => ?DESC("bridges_mqtt")} #{
desc => ?DESC("bridges_mqtt"),
required => false,
converter => fun emqx_bridge_mqtt_config:upgrade_pre_ee/1
}
)} )}
] ++ ee_fields_bridges(); ] ++ ee_fields_bridges();
fields("metrics") -> fields("metrics") ->

View File

@ -69,7 +69,10 @@ request_config() ->
{local_topic, {local_topic,
mk( mk(
binary(), binary(),
#{desc => ?DESC("config_local_topic")} #{
desc => ?DESC("config_local_topic"),
required => false
}
)}, )},
{method, {method,
mk( mk(

View File

@ -0,0 +1,229 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_config_tests).
-include_lib("eunit/include/eunit.hrl").
empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)),
?assertEqual(Conf2, check(Conf2)),
ok.
%% ensure webhook config can be checked
webhook_config_test() ->
Conf = parse(webhook_v5011_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf)
),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
Bridges = up(Bridges0),
Conf0#{<<"bridges">> := Bridges};
up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
MqttBridges = emqx_bridge_mqtt_config:upgrade_pre_ee(MqttBridges0),
Bridges#{<<"mqtt">> := MqttBridges}.
parse(HOCON) ->
{ok, Conf} = hocon:binary(HOCON),
Conf.
mqtt_config_test_() ->
Conf0 = mqtt_v5011_hocon(),
Conf1 = mqtt_v5011_full_hocon(),
[
{Tag, fun() ->
Parsed = parse(Conf),
Upgraded = up(Parsed),
Checked = check(Upgraded),
assert_upgraded(Checked)
end}
|| {Tag, Conf} <- [{"minimum", Conf0}, {"full", Conf1}]
].
assert_upgraded(#{<<"bridges">> := Bridges}) ->
assert_upgraded(Bridges);
assert_upgraded(#{<<"mqtt">> := Mqtt}) ->
assert_upgraded(Mqtt);
assert_upgraded(#{<<"bridge_one">> := Map}) ->
assert_upgraded1(Map);
assert_upgraded(#{<<"bridge_two">> := Map}) ->
assert_upgraded1(Map).
assert_upgraded1(Map) ->
?assertNot(maps:is_key(<<"connector">>, Map)),
?assertNot(maps:is_key(<<"direction">>, Map)),
?assert(maps:is_key(<<"server">>, Map)),
?assert(maps:is_key(<<"ssl">>, Map)).
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%% erlfmt-ignore
%% this is config generated from v5.0.11
webhook_v5011_hocon() ->
"""
bridges{
webhook {
the_name{
body = \"${payload}\"
connect_timeout = \"5s\"
enable_pipelining = 100
headers {\"content-type\" = \"application/json\"}
max_retries = 3
method = \"get\"
pool_size = 4
request_timeout = \"5s\"
ssl {enable = false, verify = \"verify_peer\"}
url = \"http://localhost:8080\"
}
}
}
""".
%% erlfmt-ignore
%% this is a generated from v5.0.11
mqtt_v5011_hocon() ->
"""
bridges {
mqtt {
bridge_one {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
mode = cluster_shareload
proto_ver = \"v4\"
server = \"localhost:1883\"
ssl {enable = false, verify = \"verify_peer\"}
}
direction = egress
enable = true
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttttttttt\"
retain = false
}
bridge_two {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
mode = \"cluster_shareload\"
proto_ver = \"v4\"
server = \"localhost:1883\"
ssl {enable = false, verify = \"verify_peer\"}
}
direction = ingress
enable = true
local_qos = 1
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttttttt/#\"
retain = false
}
}
}
""".
%% erlfmt-ignore
%% a more complete version
mqtt_v5011_full_hocon() ->
"""
bridges {
mqtt {
bridge_one {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
max_inflight = 32
mode = \"cluster_shareload\"
password = \"\"
proto_ver = \"v5\"
reconnect_interval = \"15s\"
replayq {offload = false, seg_bytes = \"100MB\"}
retry_interval = \"12s\"
server = \"localhost:1883\"
ssl {
ciphers = \"\"
depth = 10
enable = false
reuse_sessions = true
secure_renegotiate = true
user_lookup_fun = \"emqx_tls_psk:lookup\"
verify = \"verify_peer\"
versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]
}
username = \"\"
}
direction = \"ingress\"
enable = true
local_qos = 1
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttt/a\"
retain = false
}
bridge_two {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
max_inflight = 32
mode = \"cluster_shareload\"
password = \"\"
proto_ver = \"v4\"
reconnect_interval = \"15s\"
replayq {offload = false, seg_bytes = \"100MB\"}
retry_interval = \"44s\"
server = \"localhost:1883\"
ssl {
ciphers = \"\"
depth = 10
enable = false
reuse_sessions = true
secure_renegotiate = true
user_lookup_fun = \"emqx_tls_psk:lookup\"
verify = verify_peer
versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]
}
username = \"\"
}
direction = egress
enable = true
payload = \"${payload.x}\"
remote_qos = 1
remote_topic = \"remotetopic/1\"
retain = false
}
}
}
""".

View File

@ -42,17 +42,17 @@ fields("config") ->
[ [
{"ingress", {"ingress",
mk( mk(
hoconsc:union([none, ref(?MODULE, "ingress")]), ref(?MODULE, "ingress"),
#{ #{
default => undefined, required => {false, recursively},
desc => ?DESC("ingress_desc") desc => ?DESC("ingress_desc")
} }
)}, )},
{"egress", {"egress",
mk( mk(
hoconsc:union([none, ref(?MODULE, "egress")]), ref(?MODULE, "egress"),
#{ #{
default => undefined, required => {false, recursively},
desc => ?DESC("egress_desc") desc => ?DESC("egress_desc")
} }
)} )}
@ -109,6 +109,7 @@ fields("server_configs") ->
binary(), binary(),
#{ #{
format => <<"password">>, format => <<"password">>,
sensitive => true,
desc => ?DESC("password") desc => ?DESC("password")
} }
)}, )},
@ -146,7 +147,10 @@ fields("ingress") ->
{"local", {"local",
mk( mk(
ref(?MODULE, "ingress_local"), ref(?MODULE, "ingress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local")} #{
desc => ?DESC(emqx_connector_mqtt_schema, "ingress_local"),
is_required => false
}
)} )}
]; ];
fields("ingress_remote") -> fields("ingress_remote") ->
@ -176,7 +180,8 @@ fields("ingress_local") ->
binary(), binary(),
#{ #{
validator => fun emqx_schema:non_empty_string/1, validator => fun emqx_schema:non_empty_string/1,
desc => ?DESC("ingress_local_topic") desc => ?DESC("ingress_local_topic"),
required => false
} }
)}, )},
{qos, {qos,
@ -209,12 +214,18 @@ fields("egress") ->
{"local", {"local",
mk( mk(
ref(?MODULE, "egress_local"), ref(?MODULE, "egress_local"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_local")} #{
desc => ?DESC(emqx_connector_mqtt_schema, "egress_local"),
required => false
}
)}, )},
{"remote", {"remote",
mk( mk(
ref(?MODULE, "egress_remote"), ref(?MODULE, "egress_remote"),
#{desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote")} #{
desc => ?DESC(emqx_connector_mqtt_schema, "egress_remote"),
required => true
}
)} )}
]; ];
fields("egress_local") -> fields("egress_local") ->
@ -224,6 +235,7 @@ fields("egress_local") ->
binary(), binary(),
#{ #{
desc => ?DESC("egress_local_topic"), desc => ?DESC("egress_local_topic"),
required => false,
validator => fun emqx_schema:non_empty_string/1 validator => fun emqx_schema:non_empty_string/1
} }
)} )}

View File

@ -1,5 +1,5 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}}
, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}} , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.7.0"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}

View File

@ -62,17 +62,26 @@ fields(bridges) ->
{kafka, {kafka,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")), hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
#{desc => <<"EMQX Enterprise Config">>} #{
desc => <<"Kafka Bridge Config">>,
required => false
}
)}, )},
{hstreamdb, {hstreamdb,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
#{desc => <<"EMQX Enterprise Config">>} #{
desc => <<"HStreamDB Bridge Config">>,
required => false
}
)}, )},
{mysql, {mysql,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
#{desc => <<"EMQX Enterprise Config">>} #{
desc => <<"MySQL Bridge Config">>,
required => false
}
)} )}
] ++ mongodb_structs() ++ influxdb_structs(). ] ++ mongodb_structs() ++ influxdb_structs().
@ -81,7 +90,10 @@ mongodb_structs() ->
{Type, {Type,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)), hoconsc:map(name, ref(emqx_ee_bridge_mongodb, Type)),
#{desc => <<"EMQX Enterprise Config">>} #{
desc => <<"MongoDB Bridge Config">>,
required => false
}
)} )}
|| Type <- [mongodb_rs, mongodb_sharded, mongodb_single] || Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
]. ].
@ -91,7 +103,10 @@ influxdb_structs() ->
{Protocol, {Protocol,
mk( mk(
hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)), hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)),
#{desc => <<"EMQX Enterprise Config">>} #{
desc => <<"InfluxDB Bridge Config">>,
required => false
}
)} )}
|| Protocol <- [ || Protocol <- [
%% influxdb_udp, %% influxdb_udp,

View File

@ -67,7 +67,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon # in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true}, {:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true}, {:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
{:hocon, github: "emqx/hocon", tag: "0.30.0", override: true}, {:hocon, github: "emqx/hocon", tag: "0.31.2", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true}, {:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.1", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"}, {:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"}, {:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

View File

@ -67,7 +67,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}} , {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"} , {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}} , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.31.2"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}} , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.1"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}