Merge pull request #12199 from zhongwencool/configs-put-bridge-v1-api
fix: support bridge v1 conf hocon-format in the put configs API.
This commit is contained in:
commit
cc15e3a03b
|
@ -94,6 +94,7 @@
|
||||||
|
|
||||||
-export([ensure_atom_conf_path/2]).
|
-export([ensure_atom_conf_path/2]).
|
||||||
-export([load_config_files/2]).
|
-export([load_config_files/2]).
|
||||||
|
-export([upgrade_raw_conf/2]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([erase_all/0, backup_and_write/2]).
|
-export([erase_all/0, backup_and_write/2]).
|
||||||
|
|
|
@ -233,7 +233,10 @@ load_config(Bin, Opts) when is_binary(Bin) ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
load_config_from_raw(RawConf, Opts) ->
|
load_config_from_raw(RawConf0, Opts) ->
|
||||||
|
SchemaMod = emqx_conf:schema_module(),
|
||||||
|
RawConf1 = emqx_config:upgrade_raw_conf(SchemaMod, RawConf0),
|
||||||
|
RawConf = emqx_config:fill_defaults(RawConf1),
|
||||||
case check_config(RawConf) of
|
case check_config(RawConf) of
|
||||||
ok ->
|
ok ->
|
||||||
Error =
|
Error =
|
||||||
|
@ -452,8 +455,21 @@ sorted_fold(Func, Conf) ->
|
||||||
Error -> {error, Error}
|
Error -> {error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
to_sorted_list(Conf) ->
|
to_sorted_list(Conf0) ->
|
||||||
lists:keysort(1, maps:to_list(Conf)).
|
%% connectors > actions/bridges > rule_engine
|
||||||
|
Keys = [<<"connectors">>, <<"actions">>, <<"bridges">>, <<"rule_engine">>],
|
||||||
|
{HighPriorities, Conf1} = split_high_priority_conf(Keys, Conf0, []),
|
||||||
|
HighPriorities ++ lists:keysort(1, maps:to_list(Conf1)).
|
||||||
|
|
||||||
|
split_high_priority_conf([], Conf0, Acc) ->
|
||||||
|
{lists:reverse(Acc), Conf0};
|
||||||
|
split_high_priority_conf([Key | Keys], Conf0, Acc) ->
|
||||||
|
case maps:take(Key, Conf0) of
|
||||||
|
error ->
|
||||||
|
split_high_priority_conf(Keys, Conf0, Acc);
|
||||||
|
{Value, Conf1} ->
|
||||||
|
split_high_priority_conf(Keys, Conf1, [{Key, Value} | Acc])
|
||||||
|
end.
|
||||||
|
|
||||||
merge_conf(Key, NewConf) ->
|
merge_conf(Key, NewConf) ->
|
||||||
OldConf = emqx_conf:get_raw([Key]),
|
OldConf = emqx_conf:get_raw([Key]),
|
||||||
|
|
|
@ -40,7 +40,7 @@ t_load_config(Config) ->
|
||||||
ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}),
|
ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}),
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
|
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
|
||||||
?assertEqual(#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
|
?assertMatch(#{<<"sources">> := []}, emqx_conf:get_raw([Authz])),
|
||||||
|
|
||||||
ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
|
ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
|
||||||
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
|
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
|
||||||
|
@ -73,6 +73,10 @@ t_conflict_mix_conf(Config) ->
|
||||||
AuthNInit = emqx_conf:get_raw([authentication]),
|
AuthNInit = emqx_conf:get_raw([authentication]),
|
||||||
Redis = #{
|
Redis = #{
|
||||||
<<"backend">> => <<"redis">>,
|
<<"backend">> => <<"redis">>,
|
||||||
|
<<"database">> => 0,
|
||||||
|
<<"password_hash_algorithm">> =>
|
||||||
|
#{<<"name">> => <<"sha256">>, <<"salt_position">> => <<"prefix">>},
|
||||||
|
<<"pool_size">> => 8,
|
||||||
<<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt">>,
|
<<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt">>,
|
||||||
<<"enable">> => false,
|
<<"enable">> => false,
|
||||||
<<"mechanism">> => <<"password_based">>,
|
<<"mechanism">> => <<"password_based">>,
|
||||||
|
@ -85,10 +89,15 @@ t_conflict_mix_conf(Config) ->
|
||||||
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
|
||||||
%% init with redis sources
|
%% init with redis sources
|
||||||
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
|
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
|
||||||
?assertMatch([Redis], emqx_conf:get_raw([authentication])),
|
[RedisRaw] = emqx_conf:get_raw([authentication]),
|
||||||
|
?assertEqual(
|
||||||
|
maps:to_list(Redis),
|
||||||
|
maps:to_list(maps:remove(<<"ssl">>, RedisRaw)),
|
||||||
|
{Redis, RedisRaw}
|
||||||
|
),
|
||||||
%% change redis type from single to cluster
|
%% change redis type from single to cluster
|
||||||
%% the server field will become servers field
|
%% the server field will become servers field
|
||||||
RedisCluster = maps:remove(<<"server">>, Redis#{
|
RedisCluster = maps:without([<<"server">>, <<"database">>], Redis#{
|
||||||
<<"redis_type">> => cluster,
|
<<"redis_type">> => cluster,
|
||||||
<<"servers">> => [<<"127.0.0.1:6379">>]
|
<<"servers">> => [<<"127.0.0.1:6379">>]
|
||||||
}),
|
}),
|
||||||
|
|
|
@ -813,6 +813,9 @@ to_schema(Body) ->
|
||||||
post => #{requestBody => Body, responses => #{200 => <<"ok">>}}
|
post => #{requestBody => Body, responses => #{200 => <<"ok">>}}
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% Don't warning hocon callback namespace/0 undef.
|
||||||
|
namespace() -> atom_to_list(?MODULE).
|
||||||
|
|
||||||
fields(good_ref) ->
|
fields(good_ref) ->
|
||||||
[
|
[
|
||||||
{'webhook-host', mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:80">>})},
|
{'webhook-host', mk(emqx_schema:ip_port(), #{default => <<"127.0.0.1:80">>})},
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -32,11 +33,15 @@ end_per_suite(_) ->
|
||||||
|
|
||||||
init_per_testcase(TestCase = t_configs_node, Config) ->
|
init_per_testcase(TestCase = t_configs_node, Config) ->
|
||||||
?MODULE:TestCase({'init', Config});
|
?MODULE:TestCase({'init', Config});
|
||||||
|
init_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
|
||||||
|
?MODULE:TestCase({'init', Config});
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
end_per_testcase(TestCase = t_configs_node, Config) ->
|
end_per_testcase(TestCase = t_configs_node, Config) ->
|
||||||
?MODULE:TestCase({'end', Config});
|
?MODULE:TestCase({'end', Config});
|
||||||
|
end_per_testcase(TestCase = t_create_webhook_v1_bridges_api, Config) ->
|
||||||
|
?MODULE:TestCase({'end', Config});
|
||||||
end_per_testcase(_TestCase, Config) ->
|
end_per_testcase(_TestCase, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
|
@ -372,6 +377,100 @@ t_get_configs_in_different_accept(_Config) ->
|
||||||
%% returns error if it set to other type
|
%% returns error if it set to other type
|
||||||
?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)).
|
?assertMatch({400, "application/json", _}, Request(<<"application/xml">>)).
|
||||||
|
|
||||||
|
t_create_webhook_v1_bridges_api({'init', Config}) ->
|
||||||
|
application:ensure_all_started(emqx_connector),
|
||||||
|
application:ensure_all_started(emqx_bridge),
|
||||||
|
Config;
|
||||||
|
t_create_webhook_v1_bridges_api({'end', _}) ->
|
||||||
|
application:stop(emqx_bridge),
|
||||||
|
application:stop(emqx_connector),
|
||||||
|
ok;
|
||||||
|
t_create_webhook_v1_bridges_api(Config) ->
|
||||||
|
WebHookFile = filename:join(?config(data_dir, Config), "webhook_v1.conf"),
|
||||||
|
?assertMatch({ok, _}, hocon:files([WebHookFile])),
|
||||||
|
{ok, WebHookBin} = file:read_file(WebHookFile),
|
||||||
|
?assertEqual([], update_configs_with_binary(WebHookBin)),
|
||||||
|
Actions =
|
||||||
|
#{
|
||||||
|
<<"http">> =>
|
||||||
|
#{
|
||||||
|
<<"webhook_name">> =>
|
||||||
|
#{
|
||||||
|
<<"connector">> => <<"connector_webhook_name">>,
|
||||||
|
<<"description">> => <<>>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"parameters">> =>
|
||||||
|
#{
|
||||||
|
<<"body">> => <<"{\"value\": \"${value}\"}">>,
|
||||||
|
<<"headers">> => #{},
|
||||||
|
<<"max_retries">> => 3,
|
||||||
|
<<"method">> => <<"post">>,
|
||||||
|
<<"path">> => <<>>
|
||||||
|
},
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{
|
||||||
|
<<"health_check_interval">> => <<"15s">>,
|
||||||
|
<<"inflight_window">> => 100,
|
||||||
|
<<"max_buffer_bytes">> => <<"256MB">>,
|
||||||
|
<<"query_mode">> => <<"async">>,
|
||||||
|
<<"request_ttl">> => <<"45s">>,
|
||||||
|
<<"worker_pool_size">> => 4
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
?assertEqual(Actions, emqx_conf:get_raw([<<"actions">>])),
|
||||||
|
Connectors =
|
||||||
|
#{
|
||||||
|
<<"http">> =>
|
||||||
|
#{
|
||||||
|
<<"connector_webhook_name">> =>
|
||||||
|
#{
|
||||||
|
<<"connect_timeout">> => <<"15s">>,
|
||||||
|
<<"description">> => <<>>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"enable_pipelining">> => 100,
|
||||||
|
<<"headers">> =>
|
||||||
|
#{
|
||||||
|
<<"Authorization">> => <<"Bearer redacted">>,
|
||||||
|
<<"content-type">> => <<"application/json">>
|
||||||
|
},
|
||||||
|
<<"pool_size">> => 4,
|
||||||
|
<<"pool_type">> => <<"random">>,
|
||||||
|
<<"resource_opts">> =>
|
||||||
|
#{
|
||||||
|
<<"health_check_interval">> => <<"15s">>,
|
||||||
|
<<"start_after_created">> => true,
|
||||||
|
<<"start_timeout">> => <<"5s">>
|
||||||
|
},
|
||||||
|
<<"ssl">> =>
|
||||||
|
#{
|
||||||
|
<<"ciphers">> => [],
|
||||||
|
<<"depth">> => 10,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"hibernate_after">> => <<"5s">>,
|
||||||
|
<<"log_level">> => <<"notice">>,
|
||||||
|
<<"reuse_sessions">> => true,
|
||||||
|
<<"secure_renegotiate">> => true,
|
||||||
|
<<"user_lookup_fun">> =>
|
||||||
|
<<"emqx_tls_psk:lookup">>,
|
||||||
|
<<"verify">> => <<"verify_none">>,
|
||||||
|
<<"versions">> =>
|
||||||
|
[
|
||||||
|
<<"tlsv1.3">>,
|
||||||
|
<<"tlsv1.2">>,
|
||||||
|
<<"tlsv1.1">>,
|
||||||
|
<<"tlsv1">>
|
||||||
|
]
|
||||||
|
},
|
||||||
|
<<"url">> => <<"https://127.0.0.1:18083">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
?assertEqual(Connectors, emqx_conf:get_raw([<<"connectors">>])),
|
||||||
|
?assertEqual(#{<<"webhook">> => #{}}, emqx_conf:get_raw([<<"bridges">>])),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% Helpers
|
%% Helpers
|
||||||
|
|
||||||
get_config(Name) ->
|
get_config(Name) ->
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
bridges {
|
||||||
|
webhook {
|
||||||
|
webhook_name {
|
||||||
|
body = "{\"value\": \"${value}\"}"
|
||||||
|
connect_timeout = "15s"
|
||||||
|
enable = true
|
||||||
|
enable_pipelining = 100
|
||||||
|
headers {Authorization = "Bearer redacted", "content-type" = "application/json"}
|
||||||
|
max_retries = 3
|
||||||
|
method = "post"
|
||||||
|
pool_size = 4
|
||||||
|
pool_type = "random"
|
||||||
|
request_timeout = "15s"
|
||||||
|
resource_opts {
|
||||||
|
async_inflight_window = 100
|
||||||
|
auto_restart_interval = "60s"
|
||||||
|
enable_queue = false
|
||||||
|
health_check_interval = "15s"
|
||||||
|
max_queue_bytes = "1GB"
|
||||||
|
query_mode = "async"
|
||||||
|
worker_pool_size = 4
|
||||||
|
}
|
||||||
|
ssl {
|
||||||
|
ciphers = []
|
||||||
|
depth = 10
|
||||||
|
enable = true
|
||||||
|
reuse_sessions = true
|
||||||
|
secure_renegotiate = true
|
||||||
|
user_lookup_fun = "emqx_tls_psk:lookup"
|
||||||
|
verify = "verify_none"
|
||||||
|
versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
|
||||||
|
}
|
||||||
|
url = "https://127.0.0.1:18083"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue