emqx/apps/emqx_bridge/test/emqx_bridge_compatible_conf...

334 lines
9.3 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_compatible_config_tests).
-include_lib("eunit/include/eunit.hrl").
empty_config_test() ->
Conf1 = #{<<"bridges">> => #{}},
Conf2 = #{<<"bridges">> => #{<<"webhook">> => #{}}},
?assertEqual(Conf1, check(Conf1)),
?assertEqual(#{<<"bridges">> => #{<<"webhook">> => #{}}}, check(Conf2)),
ok.
%% ensure webhook config can be checked
webhook_config_test() ->
Conf1 = parse(webhook_v5011_hocon()),
Conf2 = parse(full_webhook_v5011_hocon()),
Conf3 = parse(full_webhook_v5019_hocon()),
?assertMatch(
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf1)
),
?assertMatch(
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf2)
),
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"resource_opts">> := ResourceOpts,
<<"body">> := <<"${payload}">>
}
}
}
} = check(Conf3),
?assertMatch(#{<<"request_ttl">> := infinity}, ResourceOpts),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
Bridges = up(Bridges0),
Conf0#{<<"bridges">> := Bridges};
up(#{<<"mqtt">> := MqttBridges0} = Bridges) ->
MqttBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
MqttBridges0, fun emqx_bridge_compatible_config:maybe_upgrade/1
),
Bridges#{<<"mqtt">> := MqttBridges};
up(#{<<"webhook">> := WebhookBridges0} = Bridges) ->
WebhookBridges = emqx_bridge_compatible_config:upgrade_pre_ee(
WebhookBridges0, fun emqx_bridge_compatible_config:http_maybe_upgrade/1
),
Bridges#{<<"webhook">> := WebhookBridges}.
parse(HOCON) ->
{ok, Conf} = hocon:binary(HOCON),
Conf.
mqtt_config_test_() ->
Conf0 = mqtt_v5011_hocon(),
Conf1 = mqtt_v5011_full_hocon(),
[
{Tag, fun() ->
Parsed = parse(Conf),
Upgraded = up(Parsed),
Checked = check(Upgraded),
assert_upgraded(Checked)
end}
|| {Tag, Conf} <- [{"minimum", Conf0}, {"full", Conf1}]
].
assert_upgraded(#{<<"bridges">> := Bridges}) ->
assert_upgraded(Bridges);
assert_upgraded(#{<<"mqtt">> := Mqtt}) ->
assert_upgraded(Mqtt);
assert_upgraded(#{<<"bridge_one">> := Map}) ->
assert_upgraded1(Map);
assert_upgraded(#{<<"bridge_two">> := Map}) ->
assert_upgraded1(Map).
assert_upgraded1(Map) ->
?assertNot(maps:is_key(<<"connector">>, Map)),
?assertNot(maps:is_key(<<"direction">>, Map)),
?assert(maps:is_key(<<"server">>, Map)),
?assert(maps:is_key(<<"ssl">>, Map)).
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{required => false}).
%% erlfmt-ignore
%% this is config generated from v5.0.11
webhook_v5011_hocon() ->
"
bridges{
webhook {
the_name{
body = \"${payload}\"
connect_timeout = \"5s\"
enable_pipelining = 100
headers {\"content-type\" = \"application/json\"}
max_retries = 3
method = \"get\"
pool_size = 4
request_timeout = \"15s\"
ssl {enable = false, verify = \"verify_peer\"}
url = \"http://localhost:8080\"
}
}
}
".
full_webhook_v5011_hocon() ->
""
"\n"
"bridges{\n"
" webhook {\n"
" the_name{\n"
" body = \"${payload}\"\n"
" connect_timeout = \"5s\"\n"
" direction = \"egress\"\n"
" enable_pipelining = 100\n"
" headers {\"content-type\" = \"application/json\"}\n"
" max_retries = 3\n"
" method = \"get\"\n"
" pool_size = 4\n"
" pool_type = \"random\"\n"
" request_timeout = \"5s\"\n"
" ssl {\n"
" ciphers = \"\"\n"
" depth = 10\n"
" enable = false\n"
" reuse_sessions = true\n"
" secure_renegotiate = true\n"
" user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
" verify = \"verify_peer\"\n"
" versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
" }\n"
" url = \"http://localhost:8080\"\n"
" }\n"
" }\n"
"}\n"
"".
%% does not contain direction
full_webhook_v5019_hocon() ->
""
"\n"
"bridges{\n"
" webhook {\n"
" the_name{\n"
" body = \"${payload}\"\n"
" connect_timeout = \"5s\"\n"
" enable_pipelining = 100\n"
" headers {\"content-type\" = \"application/json\"}\n"
" max_retries = 3\n"
" method = \"get\"\n"
" pool_size = 4\n"
" pool_type = \"random\"\n"
" request_timeout = \"1m\"\n"
" resource_opts = {\n"
" request_timeout = \"infinity\"\n"
" }\n"
" ssl {\n"
" ciphers = \"\"\n"
" depth = 10\n"
" enable = false\n"
" reuse_sessions = true\n"
" secure_renegotiate = true\n"
" user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
" verify = \"verify_peer\"\n"
" versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
" }\n"
" url = \"http://localhost:8080\"\n"
" }\n"
" }\n"
"}\n"
"".
%% erlfmt-ignore
%% this is a generated from v5.0.11
mqtt_v5011_hocon() ->
"
bridges {
mqtt {
bridge_one {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
mode = cluster_shareload
proto_ver = \"v4\"
reconnect_interval = \"15s\"
server = \"localhost:1883\"
ssl {enable = false, verify = \"verify_peer\"}
}
direction = egress
enable = true
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttttttttt\"
retain = false
}
bridge_two {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
mode = \"cluster_shareload\"
proto_ver = \"v4\"
reconnect_interval = \"15s\"
server = \"localhost:1883\"
ssl {enable = false, verify = \"verify_peer\"}
}
direction = ingress
enable = true
local_qos = 1
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttttttt/#\"
retain = false
}
}
}
".
%% erlfmt-ignore
%% a more complete version
mqtt_v5011_full_hocon() ->
"
bridges {
mqtt {
bridge_one {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
max_inflight = 32
mode = \"cluster_shareload\"
password = \"\"
proto_ver = \"v5\"
replayq {offload = false, seg_bytes = \"100MB\"}
retry_interval = \"12s\"
server = \"localhost:1883\"
ssl {
ciphers = \"\"
depth = 10
enable = false
reuse_sessions = true
secure_renegotiate = true
user_lookup_fun = \"emqx_tls_psk:lookup\"
verify = \"verify_peer\"
versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]
}
username = \"\"
}
direction = \"ingress\"
enable = true
local_qos = 1
payload = \"${payload}\"
remote_qos = 1
remote_topic = \"tttt/a\"
retain = false
}
bridge_two {
connector {
bridge_mode = false
clean_start = true
keepalive = \"60s\"
max_inflight = 32
mode = \"cluster_shareload\"
password = \"\"
proto_ver = \"v4\"
replayq {offload = false, seg_bytes = \"100MB\"}
retry_interval = \"44s\"
server = \"localhost:1883\"
ssl {
ciphers = \"\"
depth = 10
enable = false
reuse_sessions = true
secure_renegotiate = true
user_lookup_fun = \"emqx_tls_psk:lookup\"
verify = verify_peer
versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]
}
username = \"\"
}
direction = egress
enable = true
payload = \"${payload.x}\"
remote_qos = 1
remote_topic = \"remotetopic/1\"
retain = false
}
}
}
".