fix(bridges): fix a compatible problem for old webhook bridge config which created before the v5.0.12

This commit is contained in:
firest 2023-01-17 16:29:08 +08:00
parent 050d245fa5
commit c3d5c25c26
6 changed files with 118 additions and 28 deletions

View File

@ -10,7 +10,16 @@ emqx_bridge_webhook_schema {
zh: "启用/禁用 Bridge"
}
}
config_direction {
desc {
en: """Deprecated, The direction of this bridge, MUST be 'egress'"""
zh: """已废弃Bridge 的方向,必须是 egress"""
}
label: {
en: "Bridge Direction"
zh: "Bridge 方向"
}
}
config_url {
desc {
en: """

View File

@ -232,7 +232,9 @@ lookup(Type, Name, RawConf) ->
end.
maybe_upgrade(mqtt, Config) ->
emqx_bridge_mqtt_config:maybe_upgrade(Config);
emqx_bridge_compatible_config:maybe_upgrade(Config);
maybe_upgrade(webhook, Config) ->
emqx_bridge_compatible_config:webhook_maybe_upgrade(Config);
maybe_upgrade(_Other, Config) ->
Config.

View File

@ -15,22 +15,23 @@
%%--------------------------------------------------------------------
%% @doc This module was created to convert old version (from v5.0.0 to v5.0.11)
%% mqtt connector configs to newer version (developed for enterprise edition).
-module(emqx_bridge_mqtt_config).
%% mqtt/webhook connector configs to newer version (developed for enterprise edition).
-module(emqx_bridge_compatible_config).
-export([
upgrade_pre_ee/1,
maybe_upgrade/1
upgrade_pre_ee/2,
maybe_upgrade/1,
webhook_maybe_upgrade/1
]).
upgrade_pre_ee(undefined) ->
upgrade_pre_ee(undefined, _UpgradeFunc) ->
undefined;
upgrade_pre_ee(Conf0) when is_map(Conf0) ->
maps:from_list(upgrade_pre_ee(maps:to_list(Conf0)));
upgrade_pre_ee([]) ->
upgrade_pre_ee(Conf0, UpgradeFunc) when is_map(Conf0) ->
maps:from_list(upgrade_pre_ee(maps:to_list(Conf0), UpgradeFunc));
upgrade_pre_ee([], _UpgradeFunc) ->
[];
upgrade_pre_ee([{Name, Config} | Bridges]) ->
[{Name, maybe_upgrade(Config)} | upgrade_pre_ee(Bridges)].
upgrade_pre_ee([{Name, Config} | Bridges], UpgradeFunc) ->
[{Name, UpgradeFunc(Config)} | upgrade_pre_ee(Bridges, UpgradeFunc)].
maybe_upgrade(#{<<"connector">> := _} = Config0) ->
Config1 = up(Config0),
@ -39,6 +40,12 @@ maybe_upgrade(#{<<"connector">> := _} = Config0) ->
maybe_upgrade(NewVersion) ->
NewVersion.
webhook_maybe_upgrade(#{<<"direction">> := _} = Config0) ->
Config1 = maps:remove(<<"direction">>, Config0),
Config1#{<<"resource_opts">> => default_resource_opts()};
webhook_maybe_upgrade(NewVersion) ->
NewVersion.
binary_key({K, V}) ->
{atom_to_binary(K, utf8), V}.

View File

@ -121,7 +121,12 @@ fields(bridges) ->
hoconsc:map(name, ref(emqx_bridge_webhook_schema, "config")),
#{
desc => ?DESC("bridges_webhook"),
required => false
required => false,
converter => fun(X, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
)
end
}
)},
{mqtt,
@ -131,7 +136,9 @@ fields(bridges) ->
desc => ?DESC("bridges_mqtt"),
required => false,
converter => fun(X, _HoconOpts) ->
emqx_bridge_mqtt_config:upgrade_pre_ee(X)
emqx_bridge_compatible_config:upgrade_pre_ee(
X, fun emqx_bridge_compatible_config:maybe_upgrade/1
)
end
}
)}

View File

@ -81,6 +81,15 @@ request_config() ->
desc => ?DESC("config_url")
}
)},
{direction,
mk(
egress,
#{
desc => ?DESC("config_direction"),
required => {false, recursively},
deprecated => {since, "5.0.12"}
}
)},
{local_topic,
mk(
binary(),

View File

@ -13,7 +13,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_mqtt_config_tests).
-module(emqx_bridge_compatible_config_tests).
-include_lib("eunit/include/eunit.hrl").
@ -26,30 +26,54 @@ empty_config_test() ->
%% ensure webhook config can be checked
webhook_config_test() ->
Conf = parse(webhook_v5011_hocon()),
Conf1 = parse(webhook_v5011_hocon()),
Conf2 = parse(full_webhook_v5011_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf)
check(Conf1)
),
?assertMatch(
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf2)
),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
Bridges = up(Bridges0),
Conf0#{<<"bridges">> := Bridges};
up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
MqttBridges = emqx_bridge_mqtt_config:upgrade_pre_ee(MqttBridges0),
Bridges#{<<"mqtt">> := MqttBridges}.
MqttBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
MqttBridges0, fun emqx_bridge_compatible_config:maybe_upgrade/1
),
Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
WebhookBridges0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
),
Bridges#{<<"webhook">> := WebhookBridges}.
parse(HOCON) ->
{ok, Conf} = hocon:binary(HOCON),
@ -108,6 +132,38 @@ bridges{
}
""".
full_webhook_v5011_hocon() ->
""
"\n"
"bridges{\n"
" webhook {\n"
" the_name{\n"
" body = \"${payload}\"\n"
" connect_timeout = \"5s\"\n"
" direction = \"egress\"\n"
" enable_pipelining = 100\n"
" headers {\"content-type\" = \"application/json\"}\n"
" max_retries = 3\n"
" method = \"get\"\n"
" pool_size = 4\n"
" pool_type = \"random\"\n"
" request_timeout = \"5s\"\n"
" ssl {\n"
" ciphers = \"\"\n"
" depth = 10\n"
" enable = false\n"
" reuse_sessions = true\n"
" secure_renegotiate = true\n"
" user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
" verify = \"verify_peer\"\n"
" versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
" }\n"
" url = \"http://localhost:8080\"\n"
" }\n"
" }\n"
"}\n"
"".
%% erlfmt-ignore
%% this is a generated from v5.0.11
mqtt_v5011_hocon() ->