Merge pull request #11050 from emqx/release-51

chore: sync release-51 back to master
This commit is contained in:
zhongwencool 2023-06-15 09:45:31 +08:00 committed by GitHub
commit e42cc58694
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
87 changed files with 1924 additions and 732 deletions

View File

@ -194,6 +194,8 @@ jobs:
- 5.1-0
elixir:
- 1.14.5
with_elixir:
- 'no'
exclude:
- arch: arm64
build_machine: ubuntu-22.04
@ -207,7 +209,7 @@ jobs:
build_machine: ubuntu-22.04
builder: 5.1-0
elixir: 1.14.5
release_with: elixir
with_elixir: 'yes'
- profile: emqx
otp: 25.3.2-1
arch: amd64
@ -215,7 +217,7 @@ jobs:
build_machine: ubuntu-22.04
builder: 5.1-0
elixir: 1.14.5
release_with: elixir
with_elixir: 'yes'
defaults:
run:
@ -245,12 +247,9 @@ jobs:
fi
echo "pwd is $PWD"
PKGTYPES="tgz pkg"
IS_ELIXIR="no"
WITH_ELIXIR=${{ matrix.release_with }}
if [ "${WITH_ELIXIR:-}" == 'elixir' ]; then
IS_ELIXIR=${{ matrix.with_elixir }}
if [ "${IS_ELIXIR:-}" == 'yes' ]; then
PKGTYPES="tgz"
# set Elixir build flag
IS_ELIXIR="yes"
fi
for PKGTYPE in ${PKGTYPES};
do

View File

@ -16,7 +16,7 @@ endif
# Dashbord version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.2.6-beta.1
export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.3
export EMQX_EE_DASHBOARD_VERSION ?= e1.1.0-beta.5
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
# In make 4.4+, for backward-compatibility the value from the original environment is used.

View File

@ -47,5 +47,6 @@
-define(CMD_MOVE_REAR, rear).
-define(CMD_MOVE_BEFORE(Before), {before, Before}).
-define(CMD_MOVE_AFTER(After), {'after', After}).
-define(CMD_MERGE, merge).
-endif.

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Opensource edition
-define(EMQX_RELEASE_CE, "5.1.0-alpha.4").
-define(EMQX_RELEASE_CE, "5.1.0-alpha.5").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.1.0-alpha.4").
-define(EMQX_RELEASE_EE, "5.1.0-alpha.6").
%% The HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -19,5 +19,6 @@
-define(TOMBSTONE_TYPE, marked_for_deletion).
-define(TOMBSTONE_VALUE, <<"marked_for_deletion">>).
-define(TOMBSTONE_CONFIG_CHANGE_REQ, mark_it_for_deletion).
-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
-endif.

View File

@ -25,11 +25,11 @@
{emqx_utils, {path, "../emqx_utils"}},
{lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}},
{gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.7"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}},
{emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},

View File

@ -55,7 +55,9 @@
{create_authenticator, chain_name(), map()}
| {delete_authenticator, chain_name(), authenticator_id()}
| {update_authenticator, chain_name(), authenticator_id(), map()}
| {move_authenticator, chain_name(), authenticator_id(), position()}.
| {move_authenticator, chain_name(), authenticator_id(), position()}
| {merge_authenticators, map()}
| map().
%%------------------------------------------------------------------------------
%% Callbacks of config handler
@ -128,6 +130,9 @@ do_pre_config_update(_, {move_authenticator, _ChainName, AuthenticatorID, Positi
end
end
end;
do_pre_config_update(Paths, {merge_authenticators, NewConfig}, OldConfig) ->
MergeConfig = merge_authenticators(OldConfig, NewConfig),
do_pre_config_update(Paths, MergeConfig, OldConfig);
do_pre_config_update(_, OldConfig, OldConfig) ->
{ok, OldConfig};
do_pre_config_update(Paths, NewConfig, _OldConfig) ->
@ -327,3 +332,77 @@ chain_name([authentication]) ->
?GLOBAL;
chain_name([listeners, Type, Name, authentication]) ->
binary_to_existing_atom(<<(atom_to_binary(Type))/binary, ":", (atom_to_binary(Name))/binary>>).
merge_authenticators(OriginConf0, NewConf0) ->
{OriginConf1, NewConf1} =
lists:foldl(
fun(Origin, {OriginAcc, NewAcc}) ->
AuthenticatorID = authenticator_id(Origin),
case split_by_id(AuthenticatorID, NewAcc) of
{error, _} ->
{[Origin | OriginAcc], NewAcc};
{ok, BeforeFound, [Found | AfterFound]} ->
Merged = emqx_utils_maps:deep_merge(Origin, Found),
{[Merged | OriginAcc], BeforeFound ++ AfterFound}
end
end,
{[], NewConf0},
OriginConf0
),
lists:reverse(OriginConf1) ++ NewConf1.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(nowarn_export_all).
-compile(export_all).
merge_authenticators_test() ->
?assertEqual([], merge_authenticators([], [])),
Http = #{
<<"mechanism">> => <<"password_based">>, <<"backend">> => <<"http">>, <<"enable">> => true
},
Jwt = #{<<"mechanism">> => <<"jwt">>, <<"enable">> => true},
BuildIn = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"built_in_database">>,
<<"enable">> => true
},
Mongodb = #{
<<"mechanism">> => <<"password_based">>,
<<"backend">> => <<"mongodb">>,
<<"enable">> => true
},
Redis = #{
<<"mechanism">> => <<"password_based">>, <<"backend">> => <<"redis">>, <<"enable">> => true
},
BuildInDisable = BuildIn#{<<"enable">> => false},
MongodbDisable = Mongodb#{<<"enable">> => false},
RedisDisable = Redis#{<<"enable">> => false},
%% add
?assertEqual([Http], merge_authenticators([], [Http])),
?assertEqual([Http, Jwt, BuildIn], merge_authenticators([Http], [Jwt, BuildIn])),
%% merge
?assertEqual(
[BuildInDisable, MongodbDisable],
merge_authenticators([BuildIn, Mongodb], [BuildInDisable, MongodbDisable])
),
?assertEqual(
[BuildInDisable, Jwt],
merge_authenticators([BuildIn, Jwt], [BuildInDisable])
),
?assertEqual(
[BuildInDisable, Jwt, Mongodb],
merge_authenticators([BuildIn, Jwt], [Mongodb, BuildInDisable])
),
%% position changed
?assertEqual(
[BuildInDisable, Jwt, Mongodb, RedisDisable, Http],
merge_authenticators([BuildIn, Jwt, Mongodb, Redis], [RedisDisable, BuildInDisable, Http])
),
ok.
-endif.

View File

@ -18,6 +18,8 @@
-compile({no_auto_import, [get/0, get/1, put/2, erase/1]}).
-elvis([{elvis_style, god_modules, disable}]).
-include("logger.hrl").
-include("emqx.hrl").
-include("emqx_schema.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([
@ -33,7 +35,9 @@
save_configs/5,
save_to_app_env/1,
save_to_config_map/2,
save_to_override_conf/3
save_to_override_conf/3,
config_files/0,
include_dirs/0
]).
-export([merge_envs/2]).
@ -89,6 +93,7 @@
]).
-export([ensure_atom_conf_path/2]).
-export([load_config_files/2]).
-ifdef(TEST).
-export([erase_all/0, backup_and_write/2]).
@ -104,7 +109,6 @@
-define(ZONE_CONF_PATH(ZONE, PATH), [zones, ZONE | PATH]).
-define(LISTENER_CONF_PATH(TYPE, LISTENER, PATH), [listeners, TYPE, LISTENER | PATH]).
-define(CONFIG_NOT_FOUND_MAGIC, '$0tFound').
-define(MAX_KEEP_BACKUP_CONFIGS, 10).
-export_type([
@ -311,8 +315,7 @@ put_raw(KeyPath0, Config) ->
%% Load/Update configs From/To files
%%============================================================================
init_load(SchemaMod) ->
ConfFiles = application:get_env(emqx, config_files, []),
init_load(SchemaMod, ConfFiles).
init_load(SchemaMod, config_files()).
%% @doc Initial load of the given config files.
%% NOTE: The order of the files is significant, configs from files ordered
@ -977,3 +980,6 @@ put_config_post_change_actions(?PERSIS_KEY(?CONF, zones), _Zones) ->
ok;
put_config_post_change_actions(_Key, _NewValue) ->
ok.
config_files() ->
application:get_env(emqx, config_files, []).

View File

@ -44,11 +44,12 @@
code_change/3
]).
-define(MOD, {mod}).
-export([schema/2]).
-define(MOD, '$mod').
-define(WKEY, '?').
-type handler_name() :: module().
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
-optional_callbacks([
pre_config_update/3,
@ -67,10 +68,7 @@
) ->
ok | {ok, Result :: any()} | {error, Reason :: term()}.
-type state() :: #{
handlers := handlers(),
atom() => term()
}.
-type state() :: #{handlers := any()}.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, {}, []).

View File

@ -116,15 +116,19 @@ format_raw_listeners({Type0, Conf}) ->
fun
({LName, LConf0}) when is_map(LConf0) ->
Bind = parse_bind(LConf0),
MaxConn = maps:get(<<"max_connections">>, LConf0, default_max_conn()),
Running = is_running(Type, listener_id(Type, LName), LConf0#{bind => Bind}),
LConf1 = maps:remove(<<"authentication">>, LConf0),
LConf1 = maps:without([<<"authentication">>, <<"zone">>], LConf0),
LConf2 = maps:put(<<"running">>, Running, LConf1),
CurrConn =
case Running of
true -> current_conns(Type, LName, Bind);
false -> 0
end,
LConf = maps:put(<<"current_connections">>, CurrConn, LConf2),
LConf = maps:merge(LConf2, #{
<<"current_connections">> => CurrConn,
<<"max_connections">> => ensure_max_conns(MaxConn)
}),
{true, {Type0, LName, LConf}};
({_LName, _MarkDel}) ->
false
@ -417,14 +421,11 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
case [A || {quicer, _, _} = A <- application:which_applications()] of
[_] ->
DefAcceptors = erlang:system_info(schedulers_online) * 8,
SSLOpts = maps:merge(
maps:with([certfile, keyfile], Opts),
maps:get(ssl_options, Opts, #{})
),
SSLOpts = maps:get(ssl_options, Opts, #{}),
ListenOpts =
[
{certfile, str(maps:get(certfile, SSLOpts))},
{keyfile, str(maps:get(keyfile, SSLOpts))},
{certfile, emqx_schema:naive_env_interpolation(maps:get(certfile, SSLOpts))},
{keyfile, emqx_schema:naive_env_interpolation(maps:get(keyfile, SSLOpts))},
{alpn, ["mqtt"]},
{conn_acceptors, lists:max([DefAcceptors, maps:get(acceptors, Opts, 0)])},
{keep_alive_interval_ms, maps:get(keep_alive_interval, Opts, 0)},
@ -434,8 +435,10 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
{verify, maps:get(verify, SSLOpts, verify_none)}
] ++
case maps:get(cacertfile, SSLOpts, undefined) of
undefined -> [];
CaCertFile -> [{cacertfile, str(CaCertFile)}]
undefined ->
[];
CaCertFile ->
[{cacertfile, emqx_schema:naive_env_interpolation(CaCertFile)}]
end ++
case maps:get(password, SSLOpts, undefined) of
undefined -> [];
@ -992,3 +995,7 @@ unregister_ocsp_stapling_refresh(Type, Name) ->
default_max_conn() ->
<<"infinity">>.
ensure_max_conns(<<"infinity">>) -> <<"infinity">>;
ensure_max_conns(MaxConn) when is_binary(MaxConn) -> binary_to_integer(MaxConn);
ensure_max_conns(MaxConn) -> MaxConn.

View File

@ -209,7 +209,7 @@ roots(high) ->
map("name", ref("zone")),
#{
desc => ?DESC(zones),
importance => ?IMPORTANCE_LOW
importance => ?IMPORTANCE_HIDDEN
}
)},
{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication(global)},
@ -226,7 +226,10 @@ roots(medium) ->
{"broker",
sc(
ref("broker"),
#{desc => ?DESC(broker)}
#{
desc => ?DESC(broker),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"sys_topics",
sc(
@ -439,251 +442,7 @@ fields("authz_cache") ->
)}
];
fields("mqtt") ->
[
{"idle_timeout",
sc(
hoconsc:union([infinity, duration()]),
#{
default => <<"15s">>,
desc => ?DESC(mqtt_idle_timeout)
}
)},
{"max_packet_size",
sc(
bytesize(),
#{
default => <<"1MB">>,
desc => ?DESC(mqtt_max_packet_size)
}
)},
{"max_clientid_len",
sc(
range(23, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_clientid_len)
}
)},
{"max_topic_levels",
sc(
range(1, 65535),
#{
default => 128,
desc => ?DESC(mqtt_max_topic_levels)
}
)},
{"max_qos_allowed",
sc(
qos(),
#{
default => 2,
desc => ?DESC(mqtt_max_qos_allowed)
}
)},
{"max_topic_alias",
sc(
range(0, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_topic_alias)
}
)},
{"retain_available",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_retain_available)
}
)},
{"wildcard_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_wildcard_subscription)
}
)},
{"shared_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_shared_subscription)
}
)},
{"exclusive_subscription",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_exclusive_subscription)
}
)},
{"ignore_loop_deliver",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_ignore_loop_deliver)
}
)},
{"strict_mode",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_strict_mode)
}
)},
{"response_information",
sc(
string(),
#{
default => <<"">>,
desc => ?DESC(mqtt_response_information)
}
)},
{"server_keepalive",
sc(
hoconsc:union([integer(), disabled]),
#{
default => disabled,
desc => ?DESC(mqtt_server_keepalive)
}
)},
{"keepalive_backoff",
sc(
number(),
#{
default => ?DEFAULT_BACKOFF,
%% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"max_subscriptions",
sc(
hoconsc:union([range(1, inf), infinity]),
#{
default => infinity,
desc => ?DESC(mqtt_max_subscriptions)
}
)},
{"upgrade_qos",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_upgrade_qos)
}
)},
{"max_inflight",
sc(
range(1, 65535),
#{
default => 32,
desc => ?DESC(mqtt_max_inflight)
}
)},
{"retry_interval",
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval)
}
)},
{"max_awaiting_rel",
sc(
hoconsc:union([integer(), infinity]),
#{
default => 100,
desc => ?DESC(mqtt_max_awaiting_rel)
}
)},
{"await_rel_timeout",
sc(
duration(),
#{
default => <<"300s">>,
desc => ?DESC(mqtt_await_rel_timeout)
}
)},
{"session_expiry_interval",
sc(
duration(),
#{
default => <<"2h">>,
desc => ?DESC(mqtt_session_expiry_interval)
}
)},
{"max_mqueue_len",
sc(
hoconsc:union([non_neg_integer(), infinity]),
#{
default => 1000,
desc => ?DESC(mqtt_max_mqueue_len)
}
)},
{"mqueue_priorities",
sc(
hoconsc:union([disabled, map()]),
#{
default => disabled,
desc => ?DESC(mqtt_mqueue_priorities)
}
)},
{"mqueue_default_priority",
sc(
hoconsc:enum([highest, lowest]),
#{
default => lowest,
desc => ?DESC(mqtt_mqueue_default_priority)
}
)},
{"mqueue_store_qos0",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_mqueue_store_qos0)
}
)},
{"use_username_as_clientid",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_use_username_as_clientid)
}
)},
{"peer_cert_as_username",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_username)
}
)},
{"peer_cert_as_clientid",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_clientid)
}
)}
];
mqtt_general() ++ mqtt_session();
fields("zone") ->
emqx_zone_schema:zones_without_default();
fields("flapping_detect") ->
@ -980,7 +739,7 @@ fields("mqtt_quic_listener") ->
sc(
string(),
#{
%% TODO: deprecated => {since, "5.1.0"}
deprecated => {since, "5.1.0"},
desc => ?DESC(fields_mqtt_quic_listener_certfile),
importance => ?IMPORTANCE_HIDDEN
}
@ -989,7 +748,7 @@ fields("mqtt_quic_listener") ->
sc(
string(),
#{
%% TODO: deprecated => {since, "5.1.0"}
deprecated => {since, "5.1.0"},
desc => ?DESC(fields_mqtt_quic_listener_keyfile),
importance => ?IMPORTANCE_HIDDEN
}
@ -1068,7 +827,7 @@ fields("mqtt_quic_listener") ->
#{
default => 0,
desc => ?DESC(fields_mqtt_quic_listener_idle_timeout),
%% TODO: deprecated => {since, "5.1.0"}
deprecated => {since, "5.1.0"},
%% deprecated, use idle_timeout_ms instead
importance => ?IMPORTANCE_HIDDEN
}
@ -1085,7 +844,7 @@ fields("mqtt_quic_listener") ->
#{
default => <<"10s">>,
desc => ?DESC(fields_mqtt_quic_listener_handshake_idle_timeout),
%% TODO: deprecated => {since, "5.1.0"}
deprecated => {since, "5.1.0"},
%% use handshake_idle_timeout_ms
importance => ?IMPORTANCE_HIDDEN
}
@ -1437,7 +1196,9 @@ fields("listener_quic_ssl_opts") ->
true ->
{Name, Schema};
false ->
{Name, Schema#{deprecated => {since, "5.0.20"}}}
{Name, Schema#{
deprecated => {since, "5.0.20"}, importance => ?IMPORTANCE_HIDDEN
}}
end
end,
Schema1
@ -1561,22 +1322,7 @@ fields("broker") ->
desc => ?DESC(broker_session_locking_strategy)
}
)},
{"shared_subscription_strategy",
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(broker_shared_subscription_strategy)
}
)},
shared_subscription_strategy(),
{"shared_dispatch_ack_enabled",
sc(
boolean(),
@ -2048,7 +1794,8 @@ base_listener(Bind) ->
atom(),
#{
desc => ?DESC(base_listener_zone),
default => 'default'
default => 'default',
importance => ?IMPORTANCE_HIDDEN
}
)},
{"limiter",
@ -2932,10 +2679,30 @@ validate_ciphers(Ciphers) ->
validate_tls_versions(Collection, Versions) ->
AvailableVersions = available_tls_vsns(Collection),
case lists:filter(fun(V) -> not lists:member(V, AvailableVersions) end, Versions) of
[] -> ok;
[] -> validate_tls_version_gap(Versions);
Vs -> {error, {unsupported_tls_versions, Vs}}
end.
%% See also `validate_version_gap/1` in OTP ssl.erl,
%% e.g: https://github.com/emqx/otp/blob/emqx-OTP-25.1.2/lib/ssl/src/ssl.erl#L2566.
%% Do not allow configuration of TLS 1.3 with a gap where TLS 1.2 is not supported
%% as that configuration can trigger the built in version downgrade protection
%% mechanism and the handshake can fail with an Illegal Parameter alert.
validate_tls_version_gap(Versions) ->
case lists:member('tlsv1.3', Versions) of
true when length(Versions) >= 2 ->
case lists:member('tlsv1.2', Versions) of
true ->
ok;
false ->
{error,
"Using multiple versions that include tlsv1.3 but "
"exclude tlsv1.2 is not allowed"}
end;
_ ->
ok
end.
validations() ->
[
{check_process_watermark, fun check_process_watermark/1},
@ -3562,3 +3329,283 @@ flapping_detect_converter(Conf = #{<<"window_time">> := <<"disable">>}, _Opts) -
Conf#{<<"window_time">> => ?DEFAULT_WINDOW_TIME, <<"enable">> => false};
flapping_detect_converter(Conf, _Opts) ->
Conf.
mqtt_general() ->
[
{"idle_timeout",
sc(
hoconsc:union([infinity, duration()]),
#{
default => <<"15s">>,
desc => ?DESC(mqtt_idle_timeout)
}
)},
{"max_packet_size",
sc(
bytesize(),
#{
default => <<"1MB">>,
desc => ?DESC(mqtt_max_packet_size)
}
)},
{"max_clientid_len",
sc(
range(23, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_clientid_len)
}
)},
{"max_topic_levels",
sc(
range(1, 65535),
#{
default => 128,
desc => ?DESC(mqtt_max_topic_levels)
}
)},
{"max_topic_alias",
sc(
range(0, 65535),
#{
default => 65535,
desc => ?DESC(mqtt_max_topic_alias)
}
)},
{"retain_available",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_retain_available)
}
)},
{"wildcard_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_wildcard_subscription)
}
)},
{"shared_subscription",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_shared_subscription)
}
)},
shared_subscription_strategy(),
{"exclusive_subscription",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_exclusive_subscription)
}
)},
{"ignore_loop_deliver",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_ignore_loop_deliver)
}
)},
{"strict_mode",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_strict_mode)
}
)},
{"response_information",
sc(
string(),
#{
default => <<"">>,
desc => ?DESC(mqtt_response_information)
}
)},
{"server_keepalive",
sc(
hoconsc:union([pos_integer(), disabled]),
#{
default => disabled,
desc => ?DESC(mqtt_server_keepalive)
}
)},
{"keepalive_backoff",
sc(
number(),
#{
default => ?DEFAULT_BACKOFF,
%% Must add required => false, zone schema has no default.
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)},
{"keepalive_multiplier",
sc(
number(),
#{
default => ?DEFAULT_MULTIPLIER,
validator => fun ?MODULE:validate_keepalive_multiplier/1,
desc => ?DESC(mqtt_keepalive_multiplier)
}
)},
{"retry_interval",
sc(
duration(),
#{
default => <<"30s">>,
desc => ?DESC(mqtt_retry_interval)
}
)},
{"use_username_as_clientid",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_use_username_as_clientid)
}
)},
{"peer_cert_as_username",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_username)
}
)},
{"peer_cert_as_clientid",
sc(
hoconsc:enum([disabled, cn, dn, crt, pem, md5]),
#{
default => disabled,
desc => ?DESC(mqtt_peer_cert_as_clientid)
}
)}
].
%% All session's importance should be lower than general part to organize document.
mqtt_session() ->
[
{"session_expiry_interval",
sc(
duration(),
#{
default => <<"2h">>,
desc => ?DESC(mqtt_session_expiry_interval),
importance => ?IMPORTANCE_LOW
}
)},
{"max_awaiting_rel",
sc(
hoconsc:union([non_neg_integer(), infinity]),
#{
default => 100,
desc => ?DESC(mqtt_max_awaiting_rel),
importance => ?IMPORTANCE_LOW
}
)},
{"max_qos_allowed",
sc(
qos(),
#{
default => 2,
desc => ?DESC(mqtt_max_qos_allowed),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_priorities",
sc(
hoconsc:union([disabled, map()]),
#{
default => disabled,
desc => ?DESC(mqtt_mqueue_priorities),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_default_priority",
sc(
hoconsc:enum([highest, lowest]),
#{
default => lowest,
desc => ?DESC(mqtt_mqueue_default_priority),
importance => ?IMPORTANCE_LOW
}
)},
{"mqueue_store_qos0",
sc(
boolean(),
#{
default => true,
desc => ?DESC(mqtt_mqueue_store_qos0),
importance => ?IMPORTANCE_LOW
}
)},
{"max_mqueue_len",
sc(
hoconsc:union([non_neg_integer(), infinity]),
#{
default => 1000,
desc => ?DESC(mqtt_max_mqueue_len),
importance => ?IMPORTANCE_LOW
}
)},
{"max_inflight",
sc(
range(1, 65535),
#{
default => 32,
desc => ?DESC(mqtt_max_inflight),
importance => ?IMPORTANCE_LOW
}
)},
{"max_subscriptions",
sc(
hoconsc:union([range(1, inf), infinity]),
#{
default => infinity,
desc => ?DESC(mqtt_max_subscriptions),
importance => ?IMPORTANCE_LOW
}
)},
{"upgrade_qos",
sc(
boolean(),
#{
default => false,
desc => ?DESC(mqtt_upgrade_qos),
importance => ?IMPORTANCE_LOW
}
)},
{"await_rel_timeout",
sc(
duration(),
#{
default => <<"300s">>,
desc => ?DESC(mqtt_await_rel_timeout),
importance => ?IMPORTANCE_LOW
}
)}
].
shared_subscription_strategy() ->
{"shared_subscription_strategy",
sc(
hoconsc:enum([
random,
round_robin,
round_robin_per_group,
sticky,
local,
hash_topic,
hash_clientid
]),
#{
default => round_robin,
desc => ?DESC(broker_shared_subscription_strategy)
}
)}.

View File

@ -18,6 +18,7 @@
-behaviour(gen_server).
-include("emqx_schema.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
@ -158,16 +159,14 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg}, FailedSubs) ->
-spec strategy(emqx_types:group()) -> strategy().
strategy(Group) ->
try
emqx:get_config([
broker,
shared_subscription_group,
binary_to_existing_atom(Group),
strategy
])
try binary_to_existing_atom(Group) of
GroupAtom ->
Key = [broker, shared_subscription_group, GroupAtom, strategy],
case emqx:get_config(Key, ?CONFIG_NOT_FOUND_MAGIC) of
?CONFIG_NOT_FOUND_MAGIC -> get_default_shared_subscription_strategy();
Strategy -> Strategy
end
catch
error:{config_not_found, _} ->
get_default_shared_subscription_strategy();
error:badarg ->
get_default_shared_subscription_strategy()
end.
@ -190,7 +189,7 @@ do_dispatch(SubPid, _Group, Topic, Msg, retry) ->
do_dispatch(SubPid, Group, Topic, Msg, fresh) ->
case ack_enabled() of
true ->
%% TODO: delete this clase after 5.1.0
%% TODO: delete this case after 5.1.0
do_dispatch_with_ack(SubPid, Group, Topic, Msg);
false ->
send(SubPid, Topic, {deliver, Topic, Msg})
@ -240,7 +239,7 @@ with_redispatch_to(#message{qos = ?QOS_0} = Msg, _Group, _Topic) ->
with_redispatch_to(Msg, Group, Topic) ->
emqx_message:set_headers(#{redispatch_to => ?REDISPATCH_TO(Group, Topic)}, Msg).
%% @hidden Redispatch is neede only for the messages with redispatch_to header added.
%% @hidden Redispatch is needed only for the messages with redispatch_to header added.
is_redispatch_needed(#message{} = Msg) ->
case get_redispatch_to(Msg) of
?REDISPATCH_TO(_, _) ->
@ -555,4 +554,4 @@ delete_route_if_needed({Group, Topic} = GroupTopic) ->
end).
get_default_shared_subscription_strategy() ->
emqx:get_config([broker, shared_subscription_strategy]).
emqx:get_config([mqtt, shared_subscription_strategy]).

View File

@ -478,7 +478,7 @@ to_server_opts(Type, Opts) ->
Versions = integral_versions(Type, maps:get(versions, Opts, undefined)),
Ciphers = integral_ciphers(Versions, maps:get(ciphers, Opts, undefined)),
Path = fun(Key) -> resolve_cert_path_for_read_strict(maps:get(Key, Opts, undefined)) end,
filter(
ensure_valid_options(
maps:to_list(Opts#{
keyfile => Path(keyfile),
certfile => Path(certfile),
@ -511,7 +511,7 @@ to_client_opts(Type, Opts) ->
SNI = ensure_sni(Get(server_name_indication)),
Versions = integral_versions(Type, Get(versions)),
Ciphers = integral_ciphers(Versions, Get(ciphers)),
filter(
ensure_valid_options(
[
{keyfile, KeyFile},
{certfile, CertFile},
@ -556,33 +556,72 @@ resolve_cert_path_for_read_strict(Path) ->
resolve_cert_path_for_read(Path) ->
emqx_schema:naive_env_interpolation(Path).
filter([], _) ->
[];
filter([{_, undefined} | T], Versions) ->
filter(T, Versions);
filter([{_, ""} | T], Versions) ->
filter(T, Versions);
filter([{K, V} | T], Versions) ->
ensure_valid_options(Options, Versions) ->
ensure_valid_options(Options, Versions, []).
ensure_valid_options([], _, Acc) ->
lists:reverse(Acc);
ensure_valid_options([{_, undefined} | T], Versions, Acc) ->
ensure_valid_options(T, Versions, Acc);
ensure_valid_options([{_, ""} | T], Versions, Acc) ->
ensure_valid_options(T, Versions, Acc);
ensure_valid_options([{K, V} | T], Versions, Acc) ->
case tls_option_compatible_versions(K) of
all ->
[{K, V} | filter(T, Versions)];
ensure_valid_options(T, Versions, [{K, V} | Acc]);
CompatibleVersions ->
case CompatibleVersions -- (CompatibleVersions -- Versions) of
[] ->
filter(T, Versions);
_ ->
[{K, V} | filter(T, Versions)]
Enabled = sets:from_list(Versions),
Compatible = sets:from_list(CompatibleVersions),
case sets:size(sets:intersection(Enabled, Compatible)) > 0 of
true ->
ensure_valid_options(T, Versions, [{K, V} | Acc]);
false ->
?SLOG(warning, #{
msg => "drop_incompatible_tls_option", option => K, versions => Versions
}),
ensure_valid_options(T, Versions, Acc)
end
end.
%% see otp/lib/ssl/src/ssl.erl, `assert_option_dependency/4`
tls_option_compatible_versions(beast_mitigation) ->
[dtlsv1, 'tlsv1'];
tls_option_compatible_versions(padding_check) ->
[dtlsv1, 'tlsv1'];
tls_option_compatible_versions(client_renegotiation) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(reuse_session) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(reuse_sessions) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(secure_renegotiate) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(next_protocol_advertised) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(client_preferred_next_protocols) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(psk_identity) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(srp_identity) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(user_lookup_fun) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(client_renegotiation) ->
[dtlsv1, 'dtlsv1.2', 'tlsv1', 'tlsv1.1', 'tlsv1.2'];
tls_option_compatible_versions(early_data) ->
['tlsv1.3'];
tls_option_compatible_versions(certificate_authorities) ->
['tlsv1.3'];
tls_option_compatible_versions(cookie) ->
['tlsv1.3'];
tls_option_compatible_versions(key_update_at) ->
['tlsv1.3'];
tls_option_compatible_versions(anti_replay) ->
['tlsv1.3'];
tls_option_compatible_versions(session_tickets) ->
['tlsv1.3'];
tls_option_compatible_versions(supported_groups) ->
['tlsv1.3'];
tls_option_compatible_versions(use_ticket) ->
['tlsv1.3'];
tls_option_compatible_versions(_) ->
all.

View File

@ -84,6 +84,8 @@
%% Toxiproxy API
-export([
with_failure/5,
enable_failure/4,
heal_failure/4,
reset_proxy/2
]).
@ -286,9 +288,9 @@ perform_sanity_checks(_App) ->
ok.
ensure_config_handler(Module, ConfigPath) ->
#{handlers := Handlers} = sys:get_state(emqx_config_handler),
#{handlers := Handlers} = emqx_config_handler:info(),
case emqx_utils_maps:deep_get(ConfigPath, Handlers, not_found) of
#{{mod} := Module} -> ok;
#{'$mod' := Module} -> ok;
NotFound -> error({config_handler_missing, ConfigPath, Module, NotFound})
end,
ok.

View File

@ -63,12 +63,12 @@ t_fill_default_values(C) when is_list(C) ->
<<"enable_session_registry">> := true,
<<"perf">> :=
#{
<<"route_lock_type">> := key,
<<"route_lock_type">> := <<"key">>,
<<"trie_compaction">> := true
},
<<"route_batch_clean">> := false,
<<"session_locking_strategy">> := quorum,
<<"shared_subscription_strategy">> := round_robin
<<"session_locking_strategy">> := <<"quorum">>,
<<"shared_subscription_strategy">> := <<"round_robin">>
}
},
WithDefaults
@ -440,6 +440,7 @@ zone_global_defaults() ->
server_keepalive => disabled,
session_expiry_interval => 7200000,
shared_subscription => true,
shared_subscription_strategy => round_robin,
strict_mode => false,
upgrade_qos => false,
use_username_as_clientid => false,

View File

@ -19,7 +19,7 @@
-compile(export_all).
-compile(nowarn_export_all).
-define(MOD, {mod}).
-define(MOD, '$mod').
-define(WKEY, '?').
-define(CLUSTER_CONF, "/tmp/cluster.conf").
@ -99,7 +99,7 @@ t_conflict_handler(_Config) ->
%% override
ok = emqx_config_handler:add_handler([sysmon], emqx_config_logger),
?assertMatch(
#{handlers := #{sysmon := #{{mod} := emqx_config_logger}}},
#{handlers := #{sysmon := #{?MOD := emqx_config_logger}}},
emqx_config_handler:info()
),
ok.

View File

@ -1104,14 +1104,9 @@ do_t_validations(_Config) ->
emqx_utils_json:decode(ResRaw1, [return_maps]),
?assertMatch(
#{
<<"mismatches">> :=
#{
<<"listeners:ssl_not_required_bind">> :=
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> :=
<<"verify must be verify_peer when CRL check is enabled">>
}
}
},
emqx_utils_json:decode(MsgRaw1, [return_maps])
),

View File

@ -912,14 +912,9 @@ do_t_validations(_Config) ->
emqx_utils_json:decode(ResRaw1, [return_maps]),
?assertMatch(
#{
<<"mismatches">> :=
#{
<<"listeners:ssl_not_required_bind">> :=
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> :=
<<"The responder URL is required for OCSP stapling">>
}
}
},
emqx_utils_json:decode(MsgRaw1, [return_maps])
),
@ -942,14 +937,9 @@ do_t_validations(_Config) ->
emqx_utils_json:decode(ResRaw2, [return_maps]),
?assertMatch(
#{
<<"mismatches">> :=
#{
<<"listeners:ssl_not_required_bind">> :=
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> :=
<<"The issuer PEM path is required for OCSP stapling">>
}
}
},
emqx_utils_json:decode(MsgRaw2, [return_maps])
),

View File

@ -94,6 +94,18 @@ ssl_opts_tls_psk_test() ->
Checked = validate(Sc, #{<<"versions">> => [<<"tlsv1.2">>]}),
?assertMatch(#{versions := ['tlsv1.2']}, Checked).
ssl_opts_version_gap_test_() ->
Sc = emqx_schema:server_ssl_opts_schema(#{}, false),
RanchSc = emqx_schema:server_ssl_opts_schema(#{}, true),
Reason = "Using multiple versions that include tlsv1.3 but exclude tlsv1.2 is not allowed",
[
?_assertThrow(
{_, [#{kind := validation_error, reason := Reason}]},
validate(S, #{<<"versions">> => [<<"tlsv1.1">>, <<"tlsv1.3">>]})
)
|| S <- [Sc, RanchSc]
].
bad_cipher_test() ->
Sc = emqx_schema:server_ssl_opts_schema(#{}, false),
Reason = {bad_ciphers, ["foo"]},

View File

@ -769,12 +769,12 @@ t_qos1_random_dispatch_if_all_members_are_down(Config) when is_list(Config) ->
%% Expected behaviour:
%% the messages sent to client1's inflight and mq are re-dispatched after client1 is down
t_dispatch_qos2({init, Config}) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
Config;
t_dispatch_qos2({'end', Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
t_dispatch_qos2(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -923,12 +923,12 @@ t_session_takeover(Config) when is_list(Config) ->
ok.
t_session_kicked({init, Config}) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 1),
Config;
t_session_kicked({'end', Config}) when is_list(Config) ->
emqx_config:put_zone_conf(default, [mqtt, max_inflight], 0);
t_session_kicked(Config) when is_list(Config) ->
ok = ensure_config(round_robin, _AckEnabled = false),
Topic = <<"foo/bar/1">>,
ClientId1 = <<"ClientId1">>,
ClientId2 = <<"ClientId2">>,
@ -1019,12 +1019,12 @@ ensure_config(Strategy) ->
ensure_config(Strategy, _AckEnabled = true).
ensure_config(Strategy, AckEnabled) ->
emqx_config:put([broker, shared_subscription_strategy], Strategy),
emqx_config:put([mqtt, shared_subscription_strategy], Strategy),
emqx_config:put([broker, shared_dispatch_ack_enabled], AckEnabled),
ok.
ensure_node_config(Node, Strategy) ->
rpc:call(Node, emqx_config, force_put, [[broker, shared_subscription_strategy], Strategy]).
rpc:call(Node, emqx_config, force_put, [[mqtt, shared_subscription_strategy], Strategy]).
ensure_group_config(Group2Strategy) ->
lists:foreach(

View File

@ -26,10 +26,7 @@
get_enabled_authns/0
]).
%% Data backup
-export([
import_config/1
]).
-export([merge_config/1, merge_config_local/2, import_config/1]).
-include("emqx_authn.hrl").
@ -162,3 +159,9 @@ authn_list(Authn) when is_list(Authn) ->
Authn;
authn_list(Authn) when is_map(Authn) ->
[Authn].
merge_config(AuthNs) ->
emqx_authn_api:update_config([?CONF_NS_ATOM], {merge_authenticators, AuthNs}).
merge_config_local(AuthNs, Opts) ->
emqx:update_config([?CONF_NS_ATOM], {merge_authenticators, AuthNs}, Opts).

View File

@ -89,6 +89,8 @@
param_listener_id/0
]).
-export([update_config/2]).
-elvis([{elvis_style, god_modules, disable}]).
api_spec() ->
@ -101,14 +103,15 @@ paths() ->
"/authentication/:id/status",
"/authentication/:id/position/:position",
"/authentication/:id/users",
"/authentication/:id/users/:user_id",
"/authentication/:id/users/:user_id"
"/listeners/:listener_id/authentication",
"/listeners/:listener_id/authentication/:id",
"/listeners/:listener_id/authentication/:id/status",
"/listeners/:listener_id/authentication/:id/position/:position",
"/listeners/:listener_id/authentication/:id/users",
"/listeners/:listener_id/authentication/:id/users/:user_id"
%% hide listener authn api since 5.1.0
%% "/listeners/:listener_id/authentication",
%% "/listeners/:listener_id/authentication/:id",
%% "/listeners/:listener_id/authentication/:id/status",
%% "/listeners/:listener_id/authentication/:id/position/:position",
%% "/listeners/:listener_id/authentication/:id/users",
%% "/listeners/:listener_id/authentication/:id/users/:user_id"
].
roots() ->

View File

@ -48,8 +48,9 @@ api_spec() ->
paths() ->
[
"/authentication/:id/import_users",
"/listeners/:listener_id/authentication/:id/import_users"
"/authentication/:id/import_users"
%% hide the deprecated api since 5.1.0
%% "/listeners/:listener_id/authentication/:id/import_users"
].
schema("/authentication/:id/import_users") ->

View File

@ -451,7 +451,7 @@ request_for_log(Credential, #{url := Url, method := Method} = State) ->
base_url => Url,
path_query => PathQuery,
headers => Headers,
mody => Body
body => Body
}
end.

View File

@ -120,23 +120,23 @@ t_authenticator_position(_) ->
t_authenticator_import_users(_) ->
test_authenticator_import_users([]).
t_listener_authenticators(_) ->
test_authenticators(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticators(_) ->
% test_authenticators(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator(_) ->
test_authenticator(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticator(_) ->
% test_authenticator(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_users(_) ->
test_authenticator_users(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticator_users(_) ->
% test_authenticator_users(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_user(_) ->
test_authenticator_user(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticator_user(_) ->
% test_authenticator_user(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_position(_) ->
test_authenticator_position(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticator_position(_) ->
% test_authenticator_position(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_import_users(_) ->
test_authenticator_import_users(["listeners", ?TCP_DEFAULT]).
%t_listener_authenticator_import_users(_) ->
% test_authenticator_import_users(["listeners", ?TCP_DEFAULT]).
t_aggregate_metrics(_) ->
Metrics = #{
@ -683,7 +683,9 @@ test_authenticator_import_users(PathPrefix) ->
{filename, "user-credentials.csv", CSVData}
]).
t_switch_to_global_chain(_) ->
%% listener authn api is not supported since 5.1.0
%% Don't support listener switch to global chain.
ignore_switch_to_global_chain(_) ->
{ok, 200, _} = request(
post,
uri([?CONF_NS]),

View File

@ -75,7 +75,6 @@ listener_mqtt_tcp_conf(Port, EnableAuthn) ->
PortS = integer_to_binary(Port),
#{
<<"acceptors">> => 16,
<<"zone">> => <<"default">>,
<<"access_rules">> => ["allow all"],
<<"bind">> => <<"0.0.0.0:", PortS/binary>>,
<<"max_connections">> => 1024000,

View File

@ -1,5 +1 @@
authorization {
deny_action = ignore
no_match = allow
cache = { enable = true }
}

View File

@ -37,6 +37,7 @@
-define(CMD_PREPEND, prepend).
-define(CMD_APPEND, append).
-define(CMD_MOVE, move).
-define(CMD_MERGE, merge).
-define(CMD_MOVE_FRONT, front).
-define(CMD_MOVE_REAR, rear).

View File

@ -24,11 +24,6 @@
-include_lib("emqx/include/emqx_hooks.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-export([
register_metrics/0,
init/0,
@ -37,6 +32,8 @@
lookup/1,
move/2,
update/2,
merge/1,
merge_local/2,
authorize/5,
%% for telemetry information
get_enabled_authzs/0
@ -128,6 +125,12 @@ lookup(Type) ->
{Source, _Front, _Rear} = take(Type),
Source.
merge(NewConf) ->
emqx_authz_utils:update_config(?ROOT_KEY, {?CMD_MERGE, NewConf}).
merge_local(NewConf, Opts) ->
emqx:update_config(?ROOT_KEY, {?CMD_MERGE, NewConf}, Opts).
move(Type, ?CMD_MOVE_BEFORE(Before)) ->
emqx_authz_utils:update_config(
?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}
@ -158,18 +161,25 @@ pre_config_update(Path, Cmd, Sources) ->
do_pre_config_update(?CONF_KEY_PATH, Cmd, Sources) ->
do_pre_config_update(Cmd, Sources);
do_pre_config_update(?ROOT_KEY, {?CMD_MERGE, NewConf}, OldConf) ->
do_pre_config_merge(NewConf, OldConf);
do_pre_config_update(?ROOT_KEY, NewConf, OldConf) ->
do_pre_config_replace(NewConf, OldConf).
do_pre_config_merge(NewConf, OldConf) ->
MergeConf = emqx_utils_maps:deep_merge(OldConf, NewConf),
NewSources = merge_sources(OldConf, NewConf),
do_pre_config_replace(MergeConf#{<<"sources">> => NewSources}, OldConf).
%% override the entire config when updating the root key
%% emqx_conf:update(?ROOT_KEY, Conf);
do_pre_config_replace(Conf, Conf) ->
Conf;
do_pre_config_replace(NewConf, OldConf) ->
#{<<"sources">> := NewSources} = NewConf,
#{<<"sources">> := OldSources} = OldConf,
NewSources1 = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
NewConf#{<<"sources">> := NewSources1}.
NewSources = get_sources(NewConf),
OldSources = get_sources(OldConf),
ReplaceSources = do_pre_config_update({?CMD_REPLACE, NewSources}, OldSources),
NewConf#{<<"sources">> => ReplaceSources}.
do_pre_config_update({?CMD_MOVE, _, _} = Cmd, Sources) ->
do_move(Cmd, Sources);
@ -465,8 +475,8 @@ get_enabled_authzs() ->
%%------------------------------------------------------------------------------
import_config(#{?CONF_NS_BINARY := AuthzConf}) ->
Sources = maps:get(<<"sources">>, AuthzConf, []),
OldSources = emqx:get_raw_config(?CONF_KEY_PATH, []),
Sources = get_sources(AuthzConf),
OldSources = emqx:get_raw_config(?CONF_KEY_PATH, [emqx_authz_schema:default_authz()]),
MergedSources = emqx_utils:merge_lists(OldSources, Sources, fun type/1),
MergedAuthzConf = AuthzConf#{<<"sources">> => MergedSources},
case emqx_conf:update([?CONF_NS_ATOM], MergedAuthzConf, #{override_to => cluster}) of
@ -526,12 +536,12 @@ take(Type) -> take(Type, lookup()).
%% Take the source of give type, the sources list is split into two parts
%% front part and rear part.
take(Type, Sources) ->
{Front, Rear} = lists:splitwith(fun(T) -> type(T) =/= type(Type) end, Sources),
case Rear =:= [] of
true ->
Expect = type(Type),
case lists:splitwith(fun(T) -> type(T) =/= Expect end, Sources) of
{_Front, []} ->
throw({not_found_source, Type});
_ ->
{hd(Rear), Front, tl(Rear)}
{Front, [Found | Rear]} ->
{Found, Front, Rear}
end.
find_action_in_hooks() ->
@ -628,3 +638,80 @@ check_acl_file_rules(Path, Rules) ->
after
_ = file:delete(TmpPath)
end.
merge_sources(OriginConf, NewConf) ->
{OriginSource, NewSources} =
lists:foldl(
fun(Old = #{<<"type">> := Type}, {OriginAcc, NewAcc}) ->
case type_take(Type, NewAcc) of
not_found ->
{[Old | OriginAcc], NewAcc};
{New, NewAcc1} ->
MergeSource = emqx_utils_maps:deep_merge(Old, New),
{[MergeSource | OriginAcc], NewAcc1}
end
end,
{[], get_sources(NewConf)},
get_sources(OriginConf)
),
lists:reverse(OriginSource) ++ NewSources.
get_sources(Conf) ->
Default = [emqx_authz_schema:default_authz()],
maps:get(<<"sources">>, Conf, Default).
type_take(Type, Sources) ->
try take(Type, Sources) of
{Found, Front, Rear} -> {Found, Front ++ Rear}
catch
throw:{not_found_source, Type} -> not_found
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-compile(nowarn_export_all).
-compile(export_all).
merge_sources_test() ->
Default = [emqx_authz_schema:default_authz()],
Http = #{<<"type">> => <<"http">>, <<"enable">> => true},
Mysql = #{<<"type">> => <<"mysql">>, <<"enable">> => true},
Mongo = #{<<"type">> => <<"mongodb">>, <<"enable">> => true},
Redis = #{<<"type">> => <<"redis">>, <<"enable">> => true},
Postgresql = #{<<"type">> => <<"postgresql">>, <<"enable">> => true},
HttpDisable = Http#{<<"enable">> => false},
MysqlDisable = Mysql#{<<"enable">> => false},
MongoDisable = Mongo#{<<"enable">> => false},
%% has default source
?assertEqual(Default, merge_sources(#{}, #{})),
?assertEqual([], merge_sources(#{<<"sources">> => []}, #{<<"sources">> => []})),
?assertEqual(Default, merge_sources(#{}, #{<<"sources">> => []})),
%% add
?assertEqual(
[Http, Mysql, Mongo, Redis, Postgresql],
merge_sources(
#{<<"sources">> => [Http, Mysql]},
#{<<"sources">> => [Mongo, Redis, Postgresql]}
)
),
%% replace
?assertEqual(
[HttpDisable, MysqlDisable],
merge_sources(
#{<<"sources">> => [Http, Mysql]},
#{<<"sources">> => [HttpDisable, MysqlDisable]}
)
),
%% add + replace + change position
?assertEqual(
[HttpDisable, Mysql, MongoDisable, Redis],
merge_sources(
#{<<"sources">> => [Http, Mysql, Mongo]},
#{<<"sources">> => [MongoDisable, HttpDisable, Redis]}
)
),
ok.
-endif.

View File

@ -42,7 +42,8 @@
-export([
headers_no_content_type/1,
headers/1
headers/1,
default_authz/0
]).
%%--------------------------------------------------------------------

View File

@ -169,7 +169,7 @@ init_node(Type) ->
ok = emqx_dashboard_desc_cache:init(),
ok = emqx_config:put(
[dashboard, listeners],
#{http => #{enable => true, bind => 18083, proxy_header => false}}
#{http => #{bind => 18083, proxy_header => false}}
),
ok = emqx_dashboard:start_listeners(),
ready = emqx_dashboard_listener:regenerate_minirest_dispatch(),

View File

@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.
{deps, [
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.10"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -150,7 +150,7 @@ on_batch_query_async(
end.
on_get_status(_InstId, #{client := Client}) ->
case influxdb:is_alive(Client) of
case influxdb:is_alive(Client) andalso ok =:= influxdb:check_auth(Client) of
true ->
connected;
false ->
@ -262,6 +262,8 @@ do_start_client(
{ok, Client} ->
case influxdb:is_alive(Client, true) of
true ->
case influxdb:check_auth(Client) of
ok ->
State = #{
client => Client,
write_syntax => to_config(Lines, Precision)
@ -273,6 +275,19 @@ do_start_client(
state => redact_auth(State)
}),
{ok, State};
Error ->
?tp(influxdb_connector_start_failed, #{error => auth_error}),
?SLOG(warning, #{
msg => "failed_to_start_influxdb_connector",
error => Error,
connector => InstId,
client => redact_auth(Client),
reason => auth_error
}),
%% no leak
_ = influxdb:stop_client(Client),
{error, influxdb_client_auth_error}
end;
{false, Reason} ->
?tp(influxdb_connector_start_failed, #{
error => influxdb_client_not_alive, reason => Reason
@ -388,6 +403,14 @@ do_query(InstId, Client, Points) ->
connector => InstId,
points => Points
});
{error, {401, _, _}} ->
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
?SLOG(error, #{
msg => "influxdb_authorization_failed",
client => redact_auth(Client),
connector => InstId
}),
{error, {unrecoverable_error, <<"authorization failure">>}};
{error, Reason} = Err ->
?tp(influxdb_connector_do_query_failure, #{error => Reason}),
?SLOG(error, #{
@ -421,6 +444,10 @@ reply_callback(ReplyFunAndArgs, {error, Reason} = Error) ->
Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result)
end;
reply_callback(ReplyFunAndArgs, {ok, 401, _, _}) ->
?tp(influxdb_connector_do_query_failure, #{error => <<"authorization failure">>}),
Result = {error, {unrecoverable_error, <<"authorization failure">>}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
reply_callback(ReplyFunAndArgs, Result) ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result).

View File

@ -1058,3 +1058,131 @@ t_missing_field(Config) ->
end
),
ok.
t_authentication_error(Config0) ->
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
?check_trace(
begin
?wait_async_action(
create_bridge(Config),
#{?snk_kind := influxdb_connector_start_failed},
10_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := auth_error} | _],
?of_kind(influxdb_connector_start_failed, Trace)
),
ok
end
),
ok.
t_authentication_error_on_get_status(Config0) ->
ResourceId = resource_id(Config0),
% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
ok
end,
fun() ->
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
{ok, _} = create_bridge(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end
),
% Now back to wrong credentials
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
ok.
t_authentication_error_on_send_message(Config0) ->
ResourceId = resource_id(Config0),
QueryMode = proplists:get_value(query_mode, Config0, sync),
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
InfluxConfig =
case InfluxDBType of
apiv1 -> InfluxConfig0#{<<"password">> => <<"wrong_password">>};
apiv2 -> InfluxConfig0#{<<"token">> => <<"wrong_token">>}
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
ok
end,
fun() ->
{ok, _} = create_bridge(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
end
),
% Now back to wrong credentials
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{
int_key => -123,
bool => true,
float_key => 24.5,
uint_key => 123
},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
case QueryMode of
sync ->
?assertMatch(
{error, {unrecoverable_error, <<"authorization failure">>}},
send_message(Config, SentData)
);
async ->
?check_trace(
begin
?wait_async_action(
?assertEqual(ok, send_message(Config, SentData)),
#{?snk_kind := handle_async_reply},
1_000
)
end,
fun(Trace) ->
?assertMatch(
[#{error := <<"authorization failure">>} | _],
?of_kind(influxdb_connector_do_query_failure, Trace)
),
ok
end
)
end,
ok.

View File

@ -291,12 +291,15 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% do not apply the callback (which is basically to bump success or fail counter)
ok.
%% Note: since wolff client has its own replayq that is not managed by
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the wolff producers and messages might be lost.
on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_get_status(Pid, KafkaTopic);
{error, _Reason} ->
disconnected
connecting
end.
do_get_status(Client, KafkaTopic) ->
@ -315,10 +318,10 @@ do_get_status(Client, KafkaTopic) ->
true ->
connected;
false ->
disconnected
connecting
end;
{error, _} ->
disconnected
connecting
end.
ssl(#{enable := true} = SSL) ->

View File

@ -132,7 +132,7 @@ t_query_mode(CtConfig) ->
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "sync"})
end,
fun(RunStageResult, Trace) ->
fun(Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_sync_query, Trace))
end
@ -141,7 +141,7 @@ t_query_mode(CtConfig) ->
begin
publish_with_config_template_parameters(CtConfig1, #{"query_mode" => "async"})
end,
fun(RunStageResult, Trace) ->
fun(Trace) ->
%% We should have a sync Snabbkaffe trace
?assertMatch([_], ?of_kind(emqx_bridge_kafka_impl_producer_async_query, Trace))
end

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.2"}}},
{pulsar, {git, "https://github.com/emqx/pulsar-client-erl.git", {tag, "0.8.3"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -64,6 +64,8 @@
-define(pulsar_client_id, pulsar_client_id).
-define(pulsar_producers, pulsar_producers).
-define(HEALTH_CHECK_RETRY_TIMEOUT, 4_000).
%%-------------------------------------------------------------------------------------
%% `emqx_resource' API
%%-------------------------------------------------------------------------------------
@ -143,7 +145,10 @@ on_stop(InstanceId, _State) ->
ok
end.
-spec on_get_status(resource_id(), state()) -> connected | disconnected.
%% Note: since Pulsar client has its own replayq that is not managed by
%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise,
%% `emqx_resource_manager' will kill the Pulsar producers and messages might be lost.
-spec on_get_status(resource_id(), state()) -> connected | connecting.
on_get_status(_InstanceId, State = #{}) ->
#{
pulsar_client_id := ClientId,
@ -155,15 +160,15 @@ on_get_status(_InstanceId, State = #{}) ->
true ->
get_producer_status(Producers);
false ->
disconnected
connecting
catch
error:timeout ->
disconnected;
connecting;
exit:{noproc, _} ->
disconnected
connecting
end;
{error, _} ->
disconnected
connecting
end;
on_get_status(_InstanceId, _State) ->
%% If a health check happens just after a concurrent request to
@ -440,9 +445,18 @@ render(Message, Template) ->
emqx_placeholder:proc_tmpl(Template, Message, Opts).
get_producer_status(Producers) ->
do_get_producer_status(Producers, 0).
do_get_producer_status(_Producers, TimeSpent) when TimeSpent > ?HEALTH_CHECK_RETRY_TIMEOUT ->
connecting;
do_get_producer_status(Producers, TimeSpent) ->
case pulsar_producers:all_connected(Producers) of
true -> connected;
false -> connecting
true ->
connected;
false ->
Sleep = 200,
timer:sleep(Sleep),
do_get_producer_status(Producers, TimeSpent + Sleep)
end.
partition_strategy(key_dispatch) -> first_key_dispatch;

View File

@ -45,6 +45,7 @@ only_once_tests() ->
t_send_when_timeout,
t_failure_to_start_producer,
t_producer_process_crash,
t_resilience,
t_resource_manager_crash_after_producers_started,
t_resource_manager_crash_before_producers_started
].
@ -733,13 +734,6 @@ t_start_stop(Config) ->
),
%% Check that the bridge probe API doesn't leak atoms.
redbug:start(
[
"emqx_resource_manager:health_check_interval -> return",
"emqx_resource_manager:with_health_check -> return"
],
[{msgs, 100}, {time, 30_000}]
),
ProbeRes0 = probe_bridge_api(
Config,
#{<<"resource_opts">> => #{<<"health_check_interval">> => <<"1s">>}}
@ -795,7 +789,11 @@ t_on_get_status(Config) ->
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
ct:sleep(500),
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connecting}, emqx_resource_manager:health_check(ResourceId))
)
end),
%% Check that it recovers itself.
?retry(
@ -1154,3 +1152,86 @@ do_t_cluster(Config) ->
[]
),
ok.
t_resilience(Config) ->
ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
ResourceId = resource_id(Config),
?check_trace(
begin
{ok, _} = create_bridge(Config),
{ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
?retry(
_Sleep0 = 1_000,
_Attempts0 = 20,
?assertEqual(
{ok, connected},
emqx_resource_manager:health_check(ResourceId)
)
),
{ok, C} = emqtt:start_link(),
{ok, _} = emqtt:connect(C),
ProduceInterval = 100,
TestPid = self(),
StartSequentialProducer =
fun Go(SeqNo0) ->
receive
stop -> TestPid ! {done, SeqNo0}
after 0 ->
SeqNo = SeqNo0 + 1,
emqtt:publish(C, ?RULE_TOPIC_BIN, integer_to_binary(SeqNo)),
SeqNo rem 10 =:= 0 andalso (TestPid ! {sent, SeqNo}),
timer:sleep(ProduceInterval),
Go(SeqNo)
end
end,
SequentialProducer = spawn_link(fun() -> StartSequentialProducer(0) end),
ct:sleep(2 * ProduceInterval),
{ok, _} = emqx_common_test_helpers:enable_failure(
down, ProxyName, ProxyHost, ProxyPort
),
?retry(
_Sleep1 = 1_000,
_Attempts1 = 20,
?assertNotEqual(
{ok, connected},
emqx_resource_manager:health_check(ResourceId)
)
),
%% Note: we don't check for timeouts here because:
%% a) If we do trigger auto reconnect, that means that the producers were
%% killed and the `receive_consumed' below will fail.
%% b) If there's a timeout, that's the correct path; we just need to give the
%% resource manager a chance to do so.
?block_until(#{?snk_kind := resource_auto_reconnect}, 5_000),
{ok, _} = emqx_common_test_helpers:heal_failure(down, ProxyName, ProxyHost, ProxyPort),
?retry(
_Sleep2 = 1_000,
_Attempts2 = 20,
?assertEqual(
{ok, connected},
emqx_resource_manager:health_check(ResourceId)
)
),
SequentialProducer ! stop,
NumProduced =
receive
{done, SeqNo} -> SeqNo
after 1_000 -> ct:fail("producer didn't stop!")
end,
Consumed = lists:flatmap(
fun(_) -> receive_consumed(5_000) end, lists:seq(1, NumProduced)
),
?assertEqual(NumProduced, length(Consumed)),
ExpectedPayloads = lists:map(fun integer_to_binary/1, lists:seq(1, NumProduced)),
?assertEqual(
ExpectedPayloads, lists:map(fun(#{<<"payload">> := P}) -> P end, Consumed)
),
ok
end,
[]
),
ok.

View File

@ -34,4 +34,6 @@
tnx_id :: pos_integer() | '$1'
}).
-define(READONLY_KEYS, [cluster, rpc, node]).
-endif.

View File

@ -19,6 +19,7 @@
-include_lib("emqx/include/logger.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/emqx_schema.hrl").
-include("emqx_conf.hrl").
-export([add_handler/2, remove_handler/1]).
-export([get/1, get/2, get_raw/1, get_raw/2, get_all/1]).
@ -30,6 +31,7 @@
-export([dump_schema/2]).
-export([schema_module/0]).
-export([gen_example_conf/2]).
-export([check_config/2]).
%% TODO: move to emqx_dashboard when we stop building api schema at build time
-export([
@ -213,6 +215,15 @@ schema_module() ->
Value -> list_to_existing_atom(Value)
end.
check_config(Mod, Raw) ->
try
{_AppEnvs, CheckedConf} = emqx_config:check_config(Mod, Raw),
{ok, CheckedConf}
catch
throw:Error ->
{error, Error}
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

View File

@ -15,6 +15,10 @@
%%--------------------------------------------------------------------
-module(emqx_conf_cli).
-include("emqx_conf.hrl").
-include_lib("emqx/include/emqx_access_control.hrl").
-include_lib("emqx/include/emqx_authentication.hrl").
-export([
load/0,
admins/1,
@ -27,6 +31,7 @@
%% kept cluster_call for compatibility
-define(CLUSTER_CALL, cluster_call).
-define(CONF, conf).
-define(UPDATE_READONLY_KEYS_PROHIBITED, "update_readonly_keys_prohibited").
load() ->
emqx_ctl:register_command(?CLUSTER_CALL, {?MODULE, admins}, [hidden]),
@ -42,10 +47,20 @@ conf(["show"]) ->
print_hocon(get_config());
conf(["show", Key]) ->
print_hocon(get_config(Key));
conf(["load", "--replace", Path]) ->
load_config(Path, replace);
conf(["load", "--merge", Path]) ->
load_config(Path, merge);
conf(["load", Path]) ->
load_config(Path);
load_config(Path, merge);
conf(["cluster_sync" | Args]) ->
admins(Args);
conf(["reload", "--merge"]) ->
reload_etc_conf_on_local_node(merge);
conf(["reload", "--replace"]) ->
reload_etc_conf_on_local_node(replace);
conf(["reload"]) ->
conf(["reload", "--merge"]);
conf(_) ->
emqx_ctl:usage(usage_conf() ++ usage_sync()).
@ -87,18 +102,21 @@ admins(_) ->
usage_conf() ->
[
%% TODO add reload
%{"conf reload", "reload etc/emqx.conf on local node"},
{"conf show_keys", "Print all config keys"},
{"conf reload --replace|--merge", "reload etc/emqx.conf on local node"},
{"", "The new configuration values will be overlaid on the existing values by default."},
{"", "use the --replace flag to replace existing values with the new ones instead."},
{"----------------------------------", "------------"},
{"conf show_keys", "print all the currently used configuration keys."},
{"conf show [<key>]",
"Print in-use configs (including default values) under the given key. "
"Print ALL keys if key is not provided"},
{"conf load <path>",
"Load a HOCON format config file."
"The config is overlay on top of the existing configs. "
"The current node will initiate a cluster wide config change "
"transaction to sync the changes to other nodes in the cluster. "
"NOTE: do not make runtime config changes during rolling upgrade."}
"Print in-use configs (including default values) under the given key."},
{"", "Print ALL keys if key is not provided"},
{"conf load --replace|--merge <path>", "Load a HOCON format config file."},
{"", "The new configuration values will be overlaid on the existing values by default."},
{"", "use the --replace flag to replace existing values with the new ones instead."},
{"", "The current node will initiate a cluster wide config change"},
{"", "transaction to sync the changes to other nodes in the cluster. "},
{"", "NOTE: do not make runtime config changes during rolling upgrade."},
{"----------------------------------", "------------"}
].
usage_sync() ->
@ -133,53 +151,210 @@ status() ->
emqx_ctl:print("-----------------------------------------------\n").
print_keys(Config) ->
print(lists:sort(maps:keys(Config))).
Keys = lists:sort(maps:keys(Config)),
emqx_ctl:print("~1p~n", [[binary_to_existing_atom(K) || K <- Keys]]).
print(Json) ->
emqx_ctl:print("~ts~n", [emqx_logger_jsonfmt:best_effort_json(Json)]).
print_hocon(Hocon) ->
emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]).
print_hocon(Hocon) when is_map(Hocon) ->
emqx_ctl:print("~ts~n", [hocon_pp:do(Hocon, #{})]);
print_hocon({error, Error}) ->
emqx_ctl:warning("~ts~n", [Error]).
get_config() ->
drop_hidden_roots(emqx_config:fill_defaults(emqx:get_raw_config([]))).
AllConf = emqx_config:fill_defaults(emqx:get_raw_config([])),
drop_hidden_roots(AllConf).
drop_hidden_roots(Conf) ->
Hidden = hidden_roots(),
maps:without(Hidden, Conf).
lists:foldl(fun(K, Acc) -> maps:remove(K, Acc) end, Conf, hidden_roots()).
hidden_roots() ->
SchemaModule = emqx_conf:schema_module(),
Roots = hocon_schema:roots(SchemaModule),
lists:filtermap(
fun({BinName, {_RefName, Schema}}) ->
case hocon_schema:field_schema(Schema, importance) =/= ?IMPORTANCE_HIDDEN of
true ->
false;
false ->
{true, BinName}
end
end,
Roots
).
[<<"trace">>, <<"stats">>, <<"broker">>, <<"persistent_session_store">>].
get_config(Key) ->
emqx_config:fill_defaults(#{Key => emqx:get_raw_config([Key])}).
case emqx:get_raw_config([Key], undefined) of
undefined -> {error, "key_not_found"};
Value -> emqx_config:fill_defaults(#{Key => Value})
end.
-define(OPTIONS, #{rawconf_with_defaults => true, override_to => cluster}).
load_config(Path) ->
load_config(Path, ReplaceOrMerge) ->
case hocon:files([Path]) of
{ok, Conf} ->
maps:foreach(
fun(Key, Value) ->
case emqx_conf:update([Key], Value, ?OPTIONS) of
{ok, _} -> emqx_ctl:print("load ~ts ok~n", [Key]);
{error, Reason} -> emqx_ctl:print("load ~ts failed: ~p~n", [Key, Reason])
end
end,
Conf
{ok, RawConf} when RawConf =:= #{} ->
emqx_ctl:warning("load ~ts is empty~n", [Path]),
{error, empty_hocon_file};
{ok, RawConf} ->
case check_config(RawConf) of
ok ->
lists:foreach(
fun({K, V}) -> update_config_cluster(K, V, ReplaceOrMerge) end,
to_sorted_list(RawConf)
);
{error, ?UPDATE_READONLY_KEYS_PROHIBITED = Reason} ->
emqx_ctl:warning("load ~ts failed~n~ts~n", [Path, Reason]),
emqx_ctl:warning(
"Maybe try `emqx_ctl conf reload` to reload etc/emqx.conf on local node~n"
),
{error, Reason};
{error, Errors} ->
emqx_ctl:warning("load ~ts schema check failed~n", [Path]),
lists:foreach(
fun({Key, Error}) ->
emqx_ctl:warning("~ts: ~p~n", [Key, Error])
end,
Errors
),
{error, Errors}
end;
{error, Reason} ->
emqx_ctl:print("load ~ts failed~n~p~n", [Path, Reason]),
emqx_ctl:warning("load ~ts failed~n~p~n", [Path, Reason]),
{error, bad_hocon_file}
end.
update_config_cluster(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
check_res(Key, emqx_authz:merge(Conf));
update_config_cluster(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
check_res(Key, emqx_authn:merge_config(Conf));
update_config_cluster(Key, NewConf, merge) ->
Merged = merge_conf(Key, NewConf),
check_res(Key, emqx_conf:update([Key], Merged, ?OPTIONS));
update_config_cluster(Key, Value, replace) ->
check_res(Key, emqx_conf:update([Key], Value, ?OPTIONS)).
-define(LOCAL_OPTIONS, #{rawconf_with_defaults => true, persistent => false}).
update_config_local(?EMQX_AUTHORIZATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
check_res(node(), Key, emqx_authz:merge_local(Conf, ?LOCAL_OPTIONS));
update_config_local(?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY = Key, Conf, merge) ->
check_res(node(), Key, emqx_authn:merge_config_local(Conf, ?LOCAL_OPTIONS));
update_config_local(Key, NewConf, merge) ->
Merged = merge_conf(Key, NewConf),
check_res(node(), Key, emqx:update_config([Key], Merged, ?LOCAL_OPTIONS));
update_config_local(Key, Value, replace) ->
check_res(node(), Key, emqx:update_config([Key], Value, ?LOCAL_OPTIONS)).
check_res(Key, Res) -> check_res(cluster, Key, Res).
check_res(Mode, Key, {ok, _} = Res) ->
emqx_ctl:print("load ~ts in ~p ok~n", [Key, Mode]),
Res;
check_res(_Mode, Key, {error, Reason} = Res) ->
emqx_ctl:warning("load ~ts failed~n~p~n", [Key, Reason]),
Res.
check_config(Conf) ->
case check_keys_is_not_readonly(Conf) of
ok -> check_config_schema(Conf);
Error -> Error
end.
check_keys_is_not_readonly(Conf) ->
Keys = maps:keys(Conf),
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
case ReadOnlyKeys -- Keys of
ReadOnlyKeys -> ok;
_ -> {error, ?UPDATE_READONLY_KEYS_PROHIBITED}
end.
check_config_schema(Conf) ->
SchemaMod = emqx_conf:schema_module(),
Fold = fun({Key, Value}, Acc) ->
Schema = emqx_config_handler:schema(SchemaMod, [Key]),
case emqx_conf:check_config(Schema, #{Key => Value}) of
{ok, _} -> Acc;
{error, Reason} -> [{Key, Reason} | Acc]
end
end,
sorted_fold(Fold, Conf).
%% @doc Reload etc/emqx.conf to runtime config except for the readonly config
-spec reload_etc_conf_on_local_node(replace | merge) -> ok | {error, term()}.
reload_etc_conf_on_local_node(ReplaceOrMerge) ->
case load_etc_config_file() of
{ok, RawConf} ->
case filter_readonly_config(RawConf) of
{ok, Reloaded} ->
reload_config(Reloaded, ReplaceOrMerge);
{error, Error} ->
emqx_ctl:warning("check config failed~n~p~n", [Error]),
{error, Error}
end;
{error, Error} ->
emqx_ctl:warning("bad_hocon_file~n ~p~n", [Error]),
{error, bad_hocon_file}
end.
%% @doc Merge etc/emqx.conf on top of cluster.hocon.
%% For example:
%% `authorization.sources` will be merged into cluster.hocon when updated via dashboard,
%% but `authorization.sources` in not in the default emqx.conf file.
%% To make sure all root keys in emqx.conf has a fully merged value.
load_etc_config_file() ->
ConfFiles = emqx_config:config_files(),
Opts = #{format => map, include_dirs => emqx_config:include_dirs()},
case hocon:files(ConfFiles, Opts) of
{ok, RawConf} ->
HasDeprecatedFile = emqx_config:has_deprecated_file(),
%% Merge etc.conf on top of cluster.hocon,
%% Don't use map deep_merge, use hocon files merge instead.
%% In order to have a chance to delete. (e.g. zones.zone1.mqtt = null)
Keys = maps:keys(RawConf),
MergedRaw = emqx_config:load_config_files(HasDeprecatedFile, ConfFiles),
{ok, maps:with(Keys, MergedRaw)};
{error, Error} ->
?SLOG(error, #{
msg => "failed_to_read_etc_config",
files => ConfFiles,
error => Error
}),
{error, Error}
end.
filter_readonly_config(Raw) ->
SchemaMod = emqx_conf:schema_module(),
RawDefault = emqx_config:fill_defaults(Raw),
case emqx_conf:check_config(SchemaMod, RawDefault) of
{ok, _CheckedConf} ->
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
{ok, maps:without(ReadOnlyKeys, Raw)};
{error, Error} ->
?SLOG(error, #{
msg => "bad_etc_config_schema_found",
error => Error
}),
{error, Error}
end.
reload_config(AllConf, ReplaceOrMerge) ->
Fold = fun({Key, Conf}, Acc) ->
case update_config_local(Key, Conf, ReplaceOrMerge) of
{ok, _} ->
Acc;
Error ->
?SLOG(error, #{
msg => "failed_to_reload_etc_config",
key => Key,
value => Conf,
error => Error
}),
[{Key, Error} | Acc]
end
end,
sorted_fold(Fold, AllConf).
sorted_fold(Func, Conf) ->
case lists:foldl(Func, [], to_sorted_list(Conf)) of
[] -> ok;
Error -> {error, Error}
end.
to_sorted_list(Conf) ->
lists:keysort(1, maps:to_list(Conf)).
merge_conf(Key, NewConf) ->
OldConf = emqx_conf:get_raw([Key]),
do_merge_conf(OldConf, NewConf).
do_merge_conf(OldConf = #{}, NewConf = #{}) ->
emqx_utils_maps:deep_merge(OldConf, NewConf);
do_merge_conf(_OldConf, NewConf) ->
NewConf.

View File

@ -0,0 +1,139 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_conf_cli_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("emqx_conf.hrl").
-import(emqx_config_SUITE, [prepare_conf_file/3]).
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authz]),
Config.
end_per_suite(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_authz]).
t_load_config(Config) ->
Authz = authorization,
Conf = emqx_conf:get_raw([Authz]),
%% set sources to []
ConfBin = hocon_pp:do(#{<<"authorization">> => #{<<"sources">> => []}}, #{}),
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile]),
?assertEqual(#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
ConfBin0 = hocon_pp:do(#{<<"authorization">> => Conf#{<<"sources">> => []}}, #{}),
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile0]),
?assertEqual(Conf#{<<"sources">> => []}, emqx_conf:get_raw([Authz])),
%% remove sources, it will reset to default file source.
ConfBin1 = hocon_pp:do(#{<<"authorization">> => maps:remove(<<"sources">>, Conf)}, #{}),
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile1]),
Default = [emqx_authz_schema:default_authz()],
?assertEqual(Conf#{<<"sources">> => Default}, emqx_conf:get_raw([Authz])),
%% reset
ConfBin2 = hocon_pp:do(#{<<"authorization">> => Conf}, #{}),
ConfFile2 = prepare_conf_file(?FUNCTION_NAME, ConfBin2, Config),
ok = emqx_conf_cli:conf(["load", "--replace", ConfFile2]),
?assertEqual(
Conf#{<<"sources">> => [emqx_authz_schema:default_authz()]},
emqx_conf:get_raw([Authz])
),
?assertEqual({error, empty_hocon_file}, emqx_conf_cli:conf(["load", "non-exist-file"])),
ok.
t_load_readonly(Config) ->
Base0 = base_conf(),
Base1 = Base0#{<<"mqtt">> => emqx_conf:get_raw([mqtt])},
lists:foreach(
fun(Key) ->
KeyBin = atom_to_binary(Key),
Conf = emqx_conf:get_raw([Key]),
ConfBin0 = hocon_pp:do(Base1#{KeyBin => Conf}, #{}),
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
?assertEqual(
{error, "update_readonly_keys_prohibited"},
emqx_conf_cli:conf(["load", ConfFile0])
),
%% reload etc/emqx.conf changed readonly keys
ConfBin1 = hocon_pp:do(Base1#{KeyBin => changed(Key)}, #{}),
ConfFile1 = prepare_conf_file(?FUNCTION_NAME, ConfBin1, Config),
application:set_env(emqx, config_files, [ConfFile1]),
?assertMatch(ok, emqx_conf_cli:conf(["reload"])),
%% Don't update readonly key
?assertEqual(Conf, emqx_conf:get_raw([Key]))
end,
?READONLY_KEYS
),
ok.
t_error_schema_check(Config) ->
Base = #{
%% bad multiplier
<<"mqtt">> => #{<<"keepalive_multiplier">> => -1},
<<"zones">> => #{<<"my-zone">> => #{<<"mqtt">> => #{<<"keepalive_multiplier">> => 10}}}
},
ConfBin0 = hocon_pp:do(Base, #{}),
ConfFile0 = prepare_conf_file(?FUNCTION_NAME, ConfBin0, Config),
?assertMatch({error, _}, emqx_conf_cli:conf(["load", ConfFile0])),
%% zones is not updated because of error
?assertEqual(#{}, emqx_config:get_raw([zones])),
ok.
t_reload_etc_emqx_conf_not_persistent(Config) ->
Mqtt = emqx_conf:get_raw([mqtt]),
Base = base_conf(),
Conf = Base#{<<"mqtt">> => Mqtt#{<<"keepalive_multiplier">> => 3}},
ConfBin = hocon_pp:do(Conf, #{}),
ConfFile = prepare_conf_file(?FUNCTION_NAME, ConfBin, Config),
application:set_env(emqx, config_files, [ConfFile]),
ok = emqx_conf_cli:conf(["reload"]),
?assertEqual(3, emqx:get_config([mqtt, keepalive_multiplier])),
?assertNotEqual(
3,
emqx_utils_maps:deep_get(
[<<"mqtt">>, <<"keepalive_multiplier">>],
emqx_config:read_override_conf(#{}),
undefined
)
),
ok.
base_conf() ->
#{
<<"cluster">> => emqx_conf:get_raw([cluster]),
<<"node">> => emqx_conf:get_raw([node])
}.
changed(cluster) ->
#{<<"name">> => <<"emqx-test">>};
changed(node) ->
#{
<<"name">> => <<"emqx-test@127.0.0.1">>,
<<"cookie">> => <<"gokdfkdkf1122">>,
<<"data_dir">> => <<"data">>
};
changed(rpc) ->
#{<<"mode">> => <<"sync">>}.

View File

@ -62,7 +62,7 @@ end_per_suite(_Config) ->
t_log_conf(_Conf) ->
FileExpect = #{
<<"enable">> => true,
<<"formatter">> => text,
<<"formatter">> => <<"text">>,
<<"level">> => <<"info">>,
<<"rotation_count">> => 10,
<<"rotation_size">> => <<"50MB">>,
@ -73,7 +73,7 @@ t_log_conf(_Conf) ->
<<"console">> =>
#{
<<"enable">> => true,
<<"formatter">> => text,
<<"formatter">> => <<"text">>,
<<"level">> => <<"debug">>,
<<"time_offset">> => <<"system">>
},

View File

@ -38,6 +38,8 @@
-export([
print/1,
print/2,
warning/1,
warning/2,
usage/1,
usage/2
]).
@ -180,6 +182,14 @@ print(Msg) ->
print(Format, Args) ->
io:format("~ts", [format(Format, Args)]).
-spec warning(io:format()) -> ok.
warning(Format) ->
warning(Format, []).
-spec warning(io:format(), [term()]) -> ok.
warning(Format, Args) ->
io:format("\e[31m~ts\e[0m", [format(Format, Args)]).
-spec usage([cmd_usage()]) -> ok.
usage(UsageList) ->
io:format(format_usage(UsageList)).

View File

@ -145,7 +145,9 @@ apps() ->
listeners(Listeners) ->
lists:filtermap(
fun
({Protocol, Conf = #{enable := true}}) ->
({_Protocol, #{bind := 0}}) ->
false;
({Protocol, Conf = #{}}) ->
{Conf1, Bind} = ip_port(Conf),
{true, {
listener_name(Protocol),
@ -153,9 +155,7 @@ listeners(Listeners) ->
Bind,
ranch_opts(Conf1),
proto_opts(Conf1)
}};
({_Protocol, #{enable := false}}) ->
false
}}
end,
maps:to_list(Listeners)
).
@ -182,7 +182,7 @@ ranch_opts(Options) ->
SocketOpts = maps:fold(
fun filter_false/3,
[],
maps:without([enable, inet6, ipv6_v6only, proxy_header | Keys], Options)
maps:without([inet6, ipv6_v6only, proxy_header | Keys], Options)
),
InetOpts =
case Options of

View File

@ -174,7 +174,9 @@ diff_listeners(Type, Stop, Start) -> {#{Type => Stop}, #{Type => Start}}.
-define(DIR, <<"dashboard">>).
ensure_ssl_cert(#{<<"listeners">> := #{<<"https">> := #{<<"enable">> := true}}} = Conf) ->
ensure_ssl_cert(#{<<"listeners">> := #{<<"https">> := #{<<"bind">> := Bind}}} = Conf) when
Bind =/= 0
->
Https = emqx_utils_maps:deep_get([<<"listeners">>, <<"https">>], Conf, undefined),
Opts = #{required_keys => [[<<"keyfile">>], [<<"certfile">>], [<<"cacertfile">>]]},
case emqx_tls_lib:ensure_ssl_files(?DIR, Https, Opts) of

View File

@ -249,7 +249,7 @@ merge_cluster_sampler_map(M1, M2) ->
(topics, Map) ->
Map#{topics => maps:get(topics, M1)};
(Key, Map) ->
Map#{Key => maps:get(Key, M1) + maps:get(Key, M2)}
Map#{Key => maps:get(Key, M1, 0) + maps:get(Key, M2, 0)}
end,
lists:foldl(Fun, #{}, ?SAMPLER_LIST).

View File

@ -119,7 +119,8 @@ common_listener_fields() ->
integer(),
#{
default => erlang:system_info(schedulers_online),
desc => ?DESC(num_acceptors)
desc => ?DESC(num_acceptors),
importance => ?IMPORTANCE_MEDIUM
}
)},
{"max_connections",
@ -127,7 +128,8 @@ common_listener_fields() ->
integer(),
#{
default => 512,
desc => ?DESC(max_connections)
desc => ?DESC(max_connections),
importance => ?IMPORTANCE_HIGH
}
)},
{"backlog",
@ -135,7 +137,8 @@ common_listener_fields() ->
integer(),
#{
default => 1024,
desc => ?DESC(backlog)
desc => ?DESC(backlog),
importance => ?IMPORTANCE_LOW
}
)},
{"send_timeout",
@ -143,7 +146,8 @@ common_listener_fields() ->
emqx_schema:duration(),
#{
default => <<"10s">>,
desc => ?DESC(send_timeout)
desc => ?DESC(send_timeout),
importance => ?IMPORTANCE_LOW
}
)},
{"inet6",
@ -151,7 +155,8 @@ common_listener_fields() ->
boolean(),
#{
default => false,
desc => ?DESC(inet6)
desc => ?DESC(inet6),
importance => ?IMPORTANCE_LOW
}
)},
{"ipv6_v6only",
@ -159,7 +164,8 @@ common_listener_fields() ->
boolean(),
#{
default => false,
desc => ?DESC(ipv6_v6only)
desc => ?DESC(ipv6_v6only),
importance => ?IMPORTANCE_LOW
}
)},
{"proxy_header",
@ -167,7 +173,8 @@ common_listener_fields() ->
boolean(),
#{
desc => ?DESC(proxy_header),
default => false
default => false,
importance => ?IMPORTANCE_MEDIUM
}
)}
].
@ -178,7 +185,9 @@ enable(Bool) ->
boolean(),
#{
default => Bool,
required => true,
required => false,
deprecated => {since, "5.1.0"},
importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(listener_enable)
}
)}.
@ -188,9 +197,10 @@ bind(Port) ->
?HOCON(
?UNION([non_neg_integer(), emqx_schema:ip_port()]),
#{
default => Port,
required => true,
default => 0,
required => false,
example => "0.0.0.0:" ++ integer_to_list(Port),
importance => ?IMPORTANCE_HIGH,
desc => ?DESC(bind)
}
)}.

View File

@ -48,7 +48,6 @@ set_default_config(DefaultUsername, HAProxyEnabled, Opts) ->
Config = #{
listeners => #{
http => #{
enable => true,
bind => maps:get(bind, Opts, 18083),
inet6 => false,
ipv6_v6only => false,

View File

@ -49,8 +49,8 @@ t_update_conf(_Config) ->
Conf = #{
dashboard => #{
listeners => #{
https => #{bind => 18084, enable => true},
http => #{bind => 18083, enable => true}
https => #{bind => 18084},
http => #{bind => 18083}
}
}
},
@ -68,7 +68,7 @@ t_update_conf(_Config) ->
?check_trace(
begin
Raw1 = emqx_utils_maps:deep_put(
[<<"listeners">>, <<"https">>, <<"enable">>], Raw, false
[<<"listeners">>, <<"https">>, <<"bind">>], Raw, 0
),
?assertMatch({ok, _}, emqx:update_config([<<"dashboard">>], Raw1)),
?assertEqual(Raw1, emqx:get_raw_config([<<"dashboard">>])),
@ -116,7 +116,7 @@ t_update_conf(_Config) ->
emqx_mgmt_api_test_util:end_suite([emqx_management]).
t_default_ssl_cert(_Config) ->
Conf = #{dashboard => #{listeners => #{https => #{bind => 18084, enable => true}}}},
Conf = #{dashboard => #{listeners => #{https => #{bind => 18084}}}},
validate_https(Conf, 512, default_ssl_cert(), verify_none),
ok.
@ -127,7 +127,6 @@ t_normal_ssl_cert(_Config) ->
listeners => #{
https => #{
bind => 18084,
enable => true,
cacertfile => naive_env_interpolation(<<"${EMQX_ETC_DIR}/certs/cacert.pem">>),
certfile => naive_env_interpolation(<<"${EMQX_ETC_DIR}/certs/cert.pem">>),
keyfile => naive_env_interpolation(<<"${EMQX_ETC_DIR}/certs/key.pem">>),
@ -149,7 +148,6 @@ t_verify_cacertfile(_Config) ->
listeners => #{
https => #{
bind => 18084,
enable => true,
cacertfile => <<"">>,
max_connections => MaxConnection
}
@ -180,7 +178,6 @@ t_bad_certfile(_Config) ->
listeners => #{
https => #{
bind => 18084,
enable => true,
certfile => <<"${EMQX_ETC_DIR}/certs/not_found_cert.pem">>
}
}

View File

@ -53,13 +53,16 @@ file_transfer {
local {
enable = true
exporter {
enable = true
s3 {
enable = true
host = "s3.us-east-1.amazonaws.com"
port = "443"
port = 443
access_key_id = "AKIA27EZDDM9XLINWXFE"
secret_access_key = "..."
bucket = "my-bucket"
transport_options = {
ssl { enable = true }
}
}
}
}

View File

@ -69,8 +69,9 @@
authenticate(_Ctx, ClientInfo0) ->
ClientInfo = ClientInfo0#{zone => default},
case emqx_access_control:authenticate(ClientInfo) of
{ok, _} ->
{ok, mountpoint(ClientInfo)};
{ok, AuthResult} ->
ClientInfo1 = merge_auth_result(ClientInfo, AuthResult),
{ok, eval_mountpoint(ClientInfo1)};
{error, Reason} ->
{error, Reason}
end.
@ -174,8 +175,12 @@ metrics_inc(_Ctx = #{gwname := GwName}, Name, Oct) ->
%% Internal funcs
%%--------------------------------------------------------------------
mountpoint(ClientInfo = #{mountpoint := undefined}) ->
eval_mountpoint(ClientInfo = #{mountpoint := undefined}) ->
ClientInfo;
mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
eval_mountpoint(ClientInfo = #{mountpoint := MountPoint}) ->
MountPoint1 = emqx_mountpoint:replvar(MountPoint, ClientInfo),
ClientInfo#{mountpoint := MountPoint1}.
merge_auth_result(ClientInfo, AuthResult) when is_map(ClientInfo) andalso is_map(AuthResult) ->
IsSuperuser = maps:get(is_superuser, AuthResult, false),
maps:merge(ClientInfo, AuthResult#{is_superuser => IsSuperuser}).

View File

@ -61,7 +61,7 @@ tags() ->
[<<"Gateway">>].
roots() ->
[{gateway, sc(ref(?MODULE, gateway), #{importance => ?IMPORTANCE_HIDDEN})}].
[{gateway, sc(ref(?MODULE, gateway), #{importance => ?IMPORTANCE_LOW})}].
fields(gateway) ->
lists:map(

View File

@ -532,6 +532,7 @@ default_subopts() ->
-spec find_gateway_definitions() -> list(gateway_def()).
find_gateway_definitions() ->
ensure_gateway_loaded(),
lists:flatten(
lists:map(
fun(App) ->
@ -617,3 +618,16 @@ plus_max_connections(infinity, _) ->
infinity;
plus_max_connections(A, B) when is_integer(A) andalso is_integer(B) ->
A + B.
%% we need to load all gateway applications before generate doc from cli
ensure_gateway_loaded() ->
lists:foreach(
fun application:load/1,
[
emqx_gateway_exproto,
emqx_gateway_stomp,
emqx_gateway_coap,
emqx_gateway_lwm2m,
emqx_gateway_mqttsn
]
).

View File

@ -36,8 +36,10 @@ init_per_suite(Conf) ->
fun
(#{clientid := bad_client}) ->
{error, bad_username_or_password};
(ClientInfo) ->
{ok, ClientInfo}
(#{clientid := admin}) ->
{ok, #{is_superuser => true}};
(_) ->
{ok, #{}}
end
),
Conf.
@ -56,15 +58,15 @@ t_authenticate(_) ->
mountpoint => undefined,
clientid => <<"user1">>
},
NInfo1 = zone(Info1),
?assertEqual({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
NInfo1 = default_result(Info1),
?assertMatch({ok, NInfo1}, emqx_gateway_ctx:authenticate(Ctx, Info1)),
Info2 = #{
mountpoint => <<"mqttsn/${clientid}/">>,
clientid => <<"user1">>
},
NInfo2 = zone(Info2#{mountpoint => <<"mqttsn/user1/">>}),
?assertEqual({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
NInfo2 = default_result(Info2#{mountpoint => <<"mqttsn/user1/">>}),
?assertMatch({ok, NInfo2}, emqx_gateway_ctx:authenticate(Ctx, Info2)),
Info3 = #{
mountpoint => <<"mqttsn/${clientid}/">>,
@ -72,6 +74,12 @@ t_authenticate(_) ->
},
{error, bad_username_or_password} =
emqx_gateway_ctx:authenticate(Ctx, Info3),
Info4 = #{
mountpoint => undefined,
clientid => admin
},
?assertMatch({ok, #{is_superuser := true}}, emqx_gateway_ctx:authenticate(Ctx, Info4)),
ok.
zone(Info) -> Info#{zone => default}.
default_result(Info) -> Info#{zone => default, is_superuser => false}.

View File

@ -277,6 +277,8 @@ message ClientInfo {
string username = 4;
// deprecated since v5.1.0
// the request value of `mountpoint` will be ignored after v5.1.0
string mountpoint = 5;
}

View File

@ -121,11 +121,11 @@ info(ctx, #channel{ctx = Ctx}) ->
stats(#channel{subscriptions = Subs}) ->
[
{subscriptions_cnt, maps:size(Subs)},
{subscriptions_max, 0},
{subscriptions_max, infinity},
{inflight_cnt, 0},
{inflight_max, 0},
{inflight_max, infinity},
{mqueue_len, 0},
{mqueue_max, 0},
{mqueue_max, infinity},
{mqueue_dropped, 0},
{next_pkt_id, 0},
{awaiting_rel_cnt, 0},
@ -164,7 +164,8 @@ init(
DefaultClientInfo = default_clientinfo(NConnInfo),
ClientInfo = DefaultClientInfo#{
listener => ListenerId,
enable_authn => EnableAuthn
enable_authn => EnableAuthn,
mountpoint => maps:get(mountpoint, Options, undefined)
},
Channel = #channel{
ctx = Ctx,
@ -758,7 +759,23 @@ enrich_conninfo(InClientInfo, ConnInfo) ->
maps:merge(ConnInfo, maps:with(Ks, InClientInfo)).
enrich_clientinfo(InClientInfo = #{proto_name := ProtoName}, ClientInfo) ->
Ks = [clientid, username, mountpoint],
Ks = [clientid, username],
case maps:get(mountpoint, InClientInfo, <<>>) of
<<>> ->
ok;
Mp ->
?SLOG(
warning,
#{
msg => "failed_to_override_mountpoint",
reason =>
"The mountpoint in AuthenticateRequest has been deprecated. "
"Please use the `gateway.exproto.mountpoint` configuration.",
requested_mountpoint => Mp,
configured_mountpoint => maps:get(mountpoint, ClientInfo)
}
)
end,
NClientInfo = maps:merge(ClientInfo, maps:with(Ks, InClientInfo)),
NClientInfo#{protocol => proto_name_to_protocol(ProtoName)}.

View File

@ -128,7 +128,7 @@ init_per_group(LisType, ServiceName, Scheme, Cfg) ->
Svrs = emqx_exproto_echo_svr:start(Scheme),
application:load(emqx_gateway_exproto),
emqx_common_test_helpers:start_apps(
[emqx_authn, emqx_gateway],
[emqx_conf, emqx_authn, emqx_gateway],
fun(App) ->
set_special_cfg(App, LisType, ServiceName, Scheme)
end
@ -143,7 +143,7 @@ init_per_group(LisType, ServiceName, Scheme, Cfg) ->
end_per_group(_, Cfg) ->
emqx_config:erase(gateway),
emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn]),
emqx_common_test_helpers:stop_apps([emqx_gateway, emqx_authn, emqx_conf]),
emqx_exproto_echo_svr:stop(proplists:get_value(servers, Cfg)).
init_per_testcase(TestCase, Cfg) when
@ -166,6 +166,7 @@ set_special_cfg(emqx_gateway, LisType, ServiceName, Scheme) ->
#{
server => #{bind => 9100},
idle_timeout => 5000,
mountpoint => <<"ct/">>,
handler => #{
address => Addrs,
service_name => ServiceName,
@ -196,7 +197,8 @@ t_mountpoint_echo(Cfg) ->
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
mountpoint => <<"ct/">>
%% deperated since v5.1.0, and this value will be ignored
mountpoint => <<"deperated/">>
},
Password = <<"123456">>,
@ -239,7 +241,7 @@ t_raw_publish(Cfg) ->
proto_name => <<"demo">>,
proto_ver => <<"v0.1">>,
clientid => <<"test_client_1">>,
mountpoint => <<"ct/">>
mountpoint => <<>>
},
Password = <<"123456">>,
@ -321,7 +323,7 @@ t_acl_deny(Cfg) ->
send(Sock, SubBin),
{ok, SubAckBin} = recv(Sock, 5000),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"echo">>)),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"echo">>)),
PubBin = frame_publish(<<"t/dn">>, 0, <<"echo">>),
PubBinFailedAck = frame_puback(1),
@ -510,7 +512,7 @@ t_hook_message_delivered(Cfg) ->
emqx_hooks:add('message.delivered', {?MODULE, hook_fun5, []}, 1000),
emqx:publish(emqx_message:make(<<"t/dn">>, <<"1">>)),
emqx:publish(emqx_message:make(<<"ct/t/dn">>, <<"1">>)),
PubBin1 = frame_publish(<<"t/dn">>, 0, <<"2">>),
{ok, PubBin1} = recv(Sock, 5000),

View File

@ -448,7 +448,9 @@ handle_in(
Topic = header(<<"destination">>, Headers),
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, publish, Topic) of
deny ->
handle_out(error, {receipt_id(Headers), "Authorization Deny"}, Channel);
ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]),
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
shutdown(acl_denied, ErrorFrame, Channel);
allow ->
case header(<<"transaction">>, Headers) of
undefined ->
@ -494,20 +496,25 @@ handle_in(
),
case do_subscribe(NTopicFilters, NChannel) of
[] ->
ErrMsg = "Permission denied",
handle_out(error, {receipt_id(Headers), ErrMsg}, Channel);
ErrMsg = io_lib:format(
"The client.subscribe hook blocked the ~s subscription request",
[TopicFilter]
),
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
shutdown(normal, ErrorFrame, Channel);
[{MountedTopic, SubOpts} | _] ->
NSubs = [{SubId, MountedTopic, Ack, SubOpts} | Subs],
NChannel1 = NChannel#channel{subscriptions = NSubs},
handle_out_and_update(receipt, receipt_id(Headers), NChannel1)
end;
{error, ErrMsg, NChannel} ->
?SLOG(error, #{
msg => "failed_top_subscribe_topic",
topic => Topic,
reason => ErrMsg
}),
handle_out(error, {receipt_id(Headers), ErrMsg}, NChannel)
{error, subscription_id_inused, NChannel} ->
ErrMsg = io_lib:format("Subscription id ~w is in used", [SubId]),
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
shutdown(subscription_id_inused, ErrorFrame, NChannel);
{error, acl_denied, NChannel} ->
ErrMsg = io_lib:format("Insufficient permissions for ~s", [Topic]),
ErrorFrame = error_frame(receipt_id(Headers), ErrMsg),
shutdown(acl_denied, ErrorFrame, NChannel)
end;
handle_in(
?PACKET(?CMD_UNSUBSCRIBE, Headers),
@ -691,7 +698,7 @@ check_subscribed_status(
{SubId, MountedTopic, _Ack, _} ->
ok;
{SubId, _OtherTopic, _Ack, _} ->
{error, "Conflict subscribe id"};
{error, subscription_id_inused};
false ->
ok
end.
@ -704,7 +711,7 @@ check_sub_acl(
}
) ->
case emqx_gateway_ctx:authorize(Ctx, ClientInfo, subscribe, ParsedTopic) of
deny -> {error, "ACL Deny"};
deny -> {error, acl_denied};
allow -> ok
end.
@ -987,7 +994,7 @@ handle_deliver(
Delivers,
Channel = #channel{
ctx = Ctx,
clientinfo = ClientInfo,
clientinfo = ClientInfo = #{mountpoint := Mountpoint},
subscriptions = Subs
}
) ->
@ -998,22 +1005,21 @@ handle_deliver(
fun({_, _, Message}, Acc) ->
Topic0 = emqx_message:topic(Message),
case lists:keyfind(Topic0, 2, Subs) of
{Id, Topic, Ack, _SubOpts} ->
%% XXX: refactor later
{Id, _Topic, Ack, _SubOpts} ->
Message1 = emqx_mountpoint:unmount(Mountpoint, Message),
metrics_inc('messages.delivered', Channel),
NMessage = run_hooks_without_metrics(
Ctx,
'message.delivered',
[ClientInfo],
Message
Message1
),
Topic = emqx_message:topic(NMessage),
Headers = emqx_message:get_headers(NMessage),
Payload = emqx_message:payload(NMessage),
Headers0 = [
{<<"subscription">>, Id},
{<<"message-id">>, next_msgid()},
{<<"destination">>, Topic},
{<<"destination">>, emqx_message:topic(NMessage)},
{<<"content-type">>, <<"text/plain">>}
],
Headers1 =

View File

@ -185,6 +185,8 @@ parse(headers, Bin, State) ->
parse(hdname, Bin, State);
parse(hdname, <<?LF, _Rest/binary>>, _State) ->
error(unexpected_linefeed);
parse(hdname, <<?COLON, $\s, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <<?COLON, Rest/binary>>, State = #parser_state{acc = Acc}) ->
parse(hdvalue, Rest, State#parser_state{hdname = Acc, acc = <<>>});
parse(hdname, <<Ch:8, Rest/binary>>, State) ->

View File

@ -60,11 +60,11 @@ all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Cfg) ->
application:load(emqx_gateway_stomp),
ok = emqx_common_test_helpers:load_config(emqx_gateway_schema, ?CONF_DEFAULT),
emqx_mgmt_api_test_util:init_suite([emqx_authn, emqx_gateway]),
emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authn, emqx_gateway]),
Cfg.
end_per_suite(_Cfg) ->
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn]),
emqx_mgmt_api_test_util:end_suite([emqx_gateway, emqx_authn, emqx_conf]),
ok.
default_config() ->
@ -73,73 +73,40 @@ default_config() ->
stomp_ver() ->
?STOMP_VER.
restart_stomp_with_mountpoint(Mountpoint) ->
Conf = emqx:get_raw_config([gateway, stomp]),
emqx_gateway_conf:update_gateway(
stomp,
Conf#{<<"mountpoint">> => Mountpoint}
).
%%--------------------------------------------------------------------
%% Test Cases
%%--------------------------------------------------------------------
t_connect(_) ->
%% Connect should be succeed
with_connection(fun(Sock) ->
gen_tcp:send(
Sock,
serialize(
<<"CONNECT">>,
[
{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"1000,2000">>}
]
)
),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok,
Frame = #stomp_frame{
command = <<"CONNECTED">>,
headers = _,
body = _
},
_, _} = parse(Data),
<<"2000,1000">> = proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers),
gen_tcp:send(
Sock,
serialize(
<<"DISCONNECT">>,
[{<<"receipt">>, <<"12345">>}]
)
%% Successful connect
ConnectSucced = fun(Sock) ->
ok = send_connection_frame(Sock, <<"guest">>, <<"guest">>, <<"1000,2000">>),
{ok, Frame} = recv_a_frame(Sock),
?assertMatch(<<"CONNECTED">>, Frame#stomp_frame.command),
?assertEqual(
<<"2000,1000">>, proplists:get_value(<<"heart-beat">>, Frame#stomp_frame.headers)
),
{ok, Data1} = gen_tcp:recv(Sock, 0),
{ok,
#stomp_frame{
ok = send_disconnect_frame(Sock, <<"12345">>),
?assertMatch(
{ok, #stomp_frame{
command = <<"RECEIPT">>,
headers = [{<<"receipt-id">>, <<"12345">>}],
body = _
},
_, _} = parse(Data1)
end),
%% Connect will be failed, because of bad login or passcode
%% FIXME: Waiting for authentication works
%with_connection(
% fun(Sock) ->
% gen_tcp:send(Sock, serialize(<<"CONNECT">>,
% [{<<"accept-version">>, ?STOMP_VER},
% {<<"host">>, <<"127.0.0.1:61613">>},
% {<<"login">>, <<"admin">>},
% {<<"passcode">>, <<"admin">>},
% {<<"heart-beat">>, <<"1000,2000">>}])),
% {ok, Data} = gen_tcp:recv(Sock, 0),
% {ok, Frame, _, _} = parse(Data),
% #stomp_frame{command = <<"ERROR">>,
% headers = _,
% body = <<"Login or passcode error!">>} = Frame
% end),
headers = [{<<"receipt-id">>, <<"12345">>}]
}},
recv_a_frame(Sock)
)
end,
with_connection(ConnectSucced),
%% Connect will be failed, because of bad version
with_connection(fun(Sock) ->
ProtocolError = fun(Sock) ->
gen_tcp:send(
Sock,
serialize(
@ -160,7 +127,8 @@ t_connect(_) ->
headers = _,
body = <<"Login Failed: Supported protocol versions < 1.2">>
} = Frame
end).
end,
with_connection(ProtocolError).
t_heartbeat(_) ->
%% Test heart beat
@ -755,8 +723,7 @@ t_frame_error_too_many_headers(_) ->
),
Assert =
fun(Sock) ->
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ErrorFrame, _, _} = parse(Data),
{ok, ErrorFrame} = recv_a_frame(Sock),
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
?assertMatch(
match, re:run(ErrorFrame#stomp_frame.body, "too_many_headers", [{capture, none}])
@ -777,8 +744,7 @@ t_frame_error_too_long_header(_) ->
),
Assert =
fun(Sock) ->
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ErrorFrame, _, _} = parse(Data),
{ok, ErrorFrame} = recv_a_frame(Sock),
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
?assertMatch(
match, re:run(ErrorFrame#stomp_frame.body, "too_long_header", [{capture, none}])
@ -796,8 +762,7 @@ t_frame_error_too_long_body(_) ->
),
Assert =
fun(Sock) ->
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok, ErrorFrame, _, _} = parse(Data),
{ok, ErrorFrame} = recv_a_frame(Sock),
?assertMatch(#stomp_frame{command = <<"ERROR">>}, ErrorFrame),
?assertMatch(
match, re:run(ErrorFrame#stomp_frame.body, "too_long_body", [{capture, none}])
@ -808,54 +773,16 @@ t_frame_error_too_long_body(_) ->
test_frame_error(Frame, AssertFun) ->
with_connection(fun(Sock) ->
gen_tcp:send(
Sock,
serialize(
<<"CONNECT">>,
[
{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}
]
)
),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok,
#stomp_frame{
command = <<"CONNECTED">>,
headers = _,
body = _
},
_, _} = parse(Data),
send_connection_frame(Sock, <<"guest">>, <<"guest">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
gen_tcp:send(Sock, Frame),
AssertFun(Sock)
end).
t_rest_clienit_info(_) ->
with_connection(fun(Sock) ->
gen_tcp:send(
Sock,
serialize(
<<"CONNECT">>,
[
{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"login">>, <<"guest">>},
{<<"passcode">>, <<"guest">>},
{<<"heart-beat">>, <<"0,0">>}
]
)
),
{ok, Data} = gen_tcp:recv(Sock, 0),
{ok,
#stomp_frame{
command = <<"CONNECTED">>,
headers = _,
body = _
},
_, _} = parse(Data),
send_connection_frame(Sock, <<"guest">>, <<"guest">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
%% client lists
{200, Clients} = request(get, "/gateways/stomp/clients"),
@ -909,18 +836,8 @@ t_rest_clienit_info(_) ->
%% sub & unsub
{200, []} = request(get, ClientPath ++ "/subscriptions"),
gen_tcp:send(
Sock,
serialize(
<<"SUBSCRIBE">>,
[
{<<"id">>, 0},
{<<"destination">>, <<"/queue/foo">>},
{<<"ack">>, <<"client">>}
]
)
),
timer:sleep(100),
ok = send_subscribe_frame(Sock, 0, <<"/queue/foo">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
{200, Subs} = request(get, ClientPath ++ "/subscriptions"),
?assertEqual(1, length(Subs)),
@ -956,6 +873,141 @@ t_rest_clienit_info(_) ->
?assertEqual(0, length(maps:get(data, Clients2)))
end).
t_authn_superuser(_) ->
%% mock authn
meck:new(emqx_access_control, [passthrough]),
meck:expect(
emqx_access_control,
authenticate,
fun
(#{username := <<"admin">>}) ->
{ok, #{is_superuser => true}};
(#{username := <<"bad_user">>}) ->
{error, not_authorized};
(_) ->
{ok, #{is_superuser => false}}
end
),
%% mock authz
meck:expect(
emqx_access_control,
authorize,
fun
(_ClientInfo = #{is_superuser := true}, _PubSub, _Topic) ->
allow;
(_ClientInfo, _PubSub, _Topic) ->
deny
end
),
LoginFailure = fun(Sock) ->
ok = send_connection_frame(Sock, <<"bad_user">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
?assertMatch({error, closed}, recv_a_frame(Sock))
end,
PublishFailure = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
?assertMatch({error, closed}, recv_a_frame(Sock))
end,
SubscribeFailed = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"ERROR">>}}, recv_a_frame(Sock)),
?assertMatch({error, closed}, recv_a_frame(Sock))
end,
LoginAsSuperUser = fun(Sock) ->
ok = send_connection_frame(Sock, <<"admin">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
?assertMatch(
{ok, #stomp_frame{
command = <<"MESSAGE">>,
body = <<"hello">>
}},
recv_a_frame(Sock)
),
ok = send_disconnect_frame(Sock)
end,
with_connection(LoginFailure),
with_connection(PublishFailure),
with_connection(SubscribeFailed),
with_connection(LoginAsSuperUser),
meck:unload(emqx_access_control).
t_mountpoint(_) ->
restart_stomp_with_mountpoint(<<"stomp/">>),
PubSub = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
{ok, #stomp_frame{
command = <<"MESSAGE">>,
headers = Headers,
body = <<"hello">>
}} = recv_a_frame(Sock),
?assertEqual(<<"t/a">>, proplists:get_value(<<"destination">>, Headers)),
ok = send_disconnect_frame(Sock)
end,
PubToMqtt = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = emqx:subscribe(<<"stomp/t/a">>),
ok = send_message_frame(Sock, <<"t/a">>, <<"hello">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
receive
{deliver, Topic, Msg} ->
?assertEqual(<<"stomp/t/a">>, Topic),
?assertEqual(<<"hello">>, emqx_message:payload(Msg))
after 100 ->
?assert(false, "waiting message timeout")
end,
ok = send_disconnect_frame(Sock)
end,
ReceiveMsgFromMqtt = fun(Sock) ->
ok = send_connection_frame(Sock, <<"user1">>, <<"public">>),
?assertMatch({ok, #stomp_frame{command = <<"CONNECTED">>}}, recv_a_frame(Sock)),
ok = send_subscribe_frame(Sock, 0, <<"t/a">>),
?assertMatch({ok, #stomp_frame{command = <<"RECEIPT">>}}, recv_a_frame(Sock)),
Msg = emqx_message:make(<<"stomp/t/a">>, <<"hello">>),
emqx:publish(Msg),
{ok, #stomp_frame{
command = <<"MESSAGE">>,
headers = Headers,
body = <<"hello">>
}} = recv_a_frame(Sock),
?assertEqual(<<"t/a">>, proplists:get_value(<<"destination">>, Headers)),
ok = send_disconnect_frame(Sock)
end,
with_connection(PubSub),
with_connection(PubToMqtt),
with_connection(ReceiveMsgFromMqtt),
restart_stomp_with_mountpoint(<<>>).
%% TODO: Mountpoint, AuthChain, Authorization + Mountpoint, ClientInfoOverride,
%% Listeners, Metrics, Stats, ClientInfo
%%
@ -963,6 +1015,9 @@ t_rest_clienit_info(_) ->
%%
%% TODO: RateLimit, OOM,
%%--------------------------------------------------------------------
%% helpers
with_connection(DoFun) ->
{ok, Sock} = gen_tcp:connect(
{127, 0, 0, 1},
@ -973,6 +1028,8 @@ with_connection(DoFun) ->
try
DoFun(Sock)
after
erase(parser),
erase(rest),
gen_tcp:close(Sock)
end.
@ -982,6 +1039,46 @@ serialize(Command, Headers) ->
serialize(Command, Headers, Body) ->
emqx_stomp_frame:serialize_pkt(emqx_stomp_frame:make(Command, Headers, Body), #{}).
recv_a_frame(Sock) ->
Parser =
case get(parser) of
undefined ->
ProtoEnv = #{
max_headers => 1024,
max_header_length => 10240,
max_body_length => 81920
},
emqx_stomp_frame:initial_parse_state(ProtoEnv);
P ->
P
end,
LastRest =
case get(rest) of
undefined -> <<>>;
R -> R
end,
case emqx_stomp_frame:parse(LastRest, Parser) of
{more, NParser} ->
case gen_tcp:recv(Sock, 0, 5000) of
{ok, Data} ->
put(parser, NParser),
put(rest, <<LastRest/binary, Data/binary>>),
recv_a_frame(Sock);
{error, _} = Err1 ->
erase(parser),
erase(rest),
Err1
end;
{ok, Frame, Rest, NParser} ->
put(parser, NParser),
put(rest, Rest),
{ok, Frame};
{error, _} = Err ->
erase(parser),
erase(rest),
Err
end.
parse(Data) ->
ProtoEnv = #{
max_headers => 1024,
@ -996,6 +1093,52 @@ get_field(command, #stomp_frame{command = Command}) ->
get_field(body, #stomp_frame{body = Body}) ->
Body.
send_connection_frame(Sock, Username, Password) ->
send_connection_frame(Sock, Username, Password, <<"0,0">>).
send_connection_frame(Sock, Username, Password, Heartbeat) ->
Headers =
case Username == undefined of
true -> [];
false -> [{<<"login">>, Username}]
end ++
case Password == undefined of
true -> [];
false -> [{<<"passcode">>, Password}]
end,
Headers1 = [
{<<"accept-version">>, ?STOMP_VER},
{<<"host">>, <<"127.0.0.1:61613">>},
{<<"heart-beat">>, Heartbeat}
| Headers
],
ok = gen_tcp:send(Sock, serialize(<<"CONNECT">>, Headers1)).
send_subscribe_frame(Sock, Id, Topic) ->
Headers =
[
{<<"id">>, Id},
{<<"receipt">>, Id},
{<<"destination">>, Topic},
{<<"ack">>, <<"auto">>}
],
ok = gen_tcp:send(Sock, serialize(<<"SUBSCRIBE">>, Headers)).
send_message_frame(Sock, Topic, Payload) ->
Headers =
[
{<<"destination">>, Topic},
{<<"receipt">>, <<"rp-", Topic/binary>>}
],
ok = gen_tcp:send(Sock, serialize(<<"SEND">>, Headers, Payload)).
send_disconnect_frame(Sock) ->
ok = gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, [])).
send_disconnect_frame(Sock, ReceiptId) ->
Headers = [{<<"receipt">>, ReceiptId}],
ok = gen_tcp:send(Sock, serialize(<<"DISCONNECT">>, Headers)).
clients() ->
{200, Clients} = request(get, "/gateways/stomp/clients"),
maps:get(data, Clients).

View File

@ -63,7 +63,6 @@
-define(CLIENT_QSCHEMA, [
{<<"node">>, atom},
{<<"username">>, binary},
{<<"zone">>, atom},
{<<"ip_address">>, ip},
{<<"conn_state">>, atom},
{<<"clean_start">>, atom},
@ -122,11 +121,6 @@ schema("/clients") ->
required => false,
desc => <<"User name">>
})},
{zone,
hoconsc:mk(binary(), #{
in => query,
required => false
})},
{ip_address,
hoconsc:mk(binary(), #{
in => query,
@ -549,12 +543,7 @@ fields(client) ->
" Maximum number of subscriptions allowed by this client">>
})},
{username, hoconsc:mk(binary(), #{desc => <<"User name of client when connecting">>})},
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})},
{zone,
hoconsc:mk(binary(), #{
desc =>
<<"Indicate the configuration group used by the client">>
})}
{mountpoint, hoconsc:mk(binary(), #{desc => <<"Topic mountpoint">>})}
];
fields(authz_cache) ->
[
@ -848,8 +837,6 @@ ms(clientid, X) ->
#{clientinfo => #{clientid => X}};
ms(username, X) ->
#{clientinfo => #{username => X}};
ms(zone, X) ->
#{clientinfo => #{zone => X}};
ms(conn_state, X) ->
#{conn_state => X};
ms(ip_address, X) ->
@ -930,6 +917,7 @@ format_channel_info(WhichNode, {_, ClientInfo0, ClientStats}) ->
sockname,
retry_interval,
upgrade_qos,
zone,
%% sessionID, defined in emqx_session.erl
id
],

View File

@ -43,9 +43,8 @@
<<"alarm">>,
<<"sys_topics">>,
<<"sysmon">>,
<<"log">>,
<<"persistent_session_store">>,
<<"zones">>
<<"log">>
%% <<"zones">>
]).
api_spec() ->

View File

@ -277,10 +277,39 @@ fields(Type) ->
listener_schema(Opts) ->
emqx_dashboard_swagger:schema_with_example(
?UNION(lists:map(fun(#{ref := Ref}) -> Ref end, listeners_info(Opts))),
hoconsc:union(listener_union_member_selector(Opts)),
tcp_schema_example()
).
listener_union_member_selector(Opts) ->
ListenersInfo = listeners_info(Opts),
Index = maps:from_list([
{iolist_to_binary(ListenerType), Ref}
|| #{listener_type := ListenerType, ref := Ref} <- ListenersInfo
]),
fun
(all_union_members) ->
maps:values(Index);
({value, V}) ->
case V of
#{<<"type">> := T} ->
case maps:get(T, Index, undefined) of
undefined ->
throw(#{
field_name => type,
reason => <<"unknown listener type">>
});
Ref ->
[Ref]
end;
_ ->
throw(#{
field_name => type,
reason => <<"unknown listener type">>
})
end
end.
create_listener_schema(Opts) ->
Schemas = [
?R_REF(Mod, {Type, with_name})
@ -311,6 +340,7 @@ listeners_info(Opts) ->
TypeAtom = list_to_existing_atom(ListenerType),
#{
ref => ?R_REF(Ref),
listener_type => ListenerType,
schema => [
{type, ?HOCON(?ENUM([TypeAtom]), #{desc => "Listener type", required => true})},
{running, ?HOCON(boolean(), #{desc => "Listener status", required => false})},
@ -795,8 +825,7 @@ tcp_schema_example() ->
send_timeout => <<"15s">>,
send_timeout_close => true
},
type => tcp,
zone => default
type => tcp
}.
create_listener(Body) ->

View File

@ -199,18 +199,19 @@ get_global_zone() ->
update_global_zone(Change) ->
update_config("global_zone", Change).
t_zones(_Config) ->
{ok, Zones} = get_config("zones"),
{ok, #{<<"mqtt">> := OldMqtt} = Zone1} = get_global_zone(),
Mqtt1 = maps:remove(<<"max_subscriptions">>, OldMqtt),
{ok, #{}} = update_config("zones", Zones#{<<"new_zone">> => Zone1#{<<"mqtt">> => Mqtt1}}),
NewMqtt = emqx_config:get_raw([zones, new_zone, mqtt]),
%% we remove max_subscription from global zone, so the new zone should not have it.
?assertEqual(Mqtt1, NewMqtt),
%% delete the new zones
{ok, #{}} = update_config("zones", Zones),
?assertEqual(undefined, emqx_config:get_raw([zones, new_zone], undefined)),
ok.
%% hide /configs/zones api in 5.1.0, so we comment this test.
%t_zones(_Config) ->
% {ok, Zones} = get_config("zones"),
% {ok, #{<<"mqtt">> := OldMqtt} = Zone1} = get_global_zone(),
% Mqtt1 = maps:remove(<<"max_subscriptions">>, OldMqtt),
% {ok, #{}} = update_config("zones", Zones#{<<"new_zone">> => Zone1#{<<"mqtt">> => Mqtt1}}),
% NewMqtt = emqx_config:get_raw([zones, new_zone, mqtt]),
% %% we remove max_subscription from global zone, so the new zone should not have it.
% ?assertEqual(Mqtt1, NewMqtt),
% %% delete the new zones
% {ok, #{}} = update_config("zones", Zones),
% ?assertEqual(undefined, emqx_config:get_raw([zones, new_zone], undefined)),
% ok.
t_dashboard(_Config) ->
{ok, Dashboard = #{<<"listeners">> := Listeners}} = get_config("dashboard"),

View File

@ -19,6 +19,7 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(PORT(Base), (Base + ?LINE)).
-define(PORT, ?PORT(20000)).
@ -139,6 +140,16 @@ t_list_listeners(Config) when is_list(Config) ->
?assertMatch(#{<<"max_connections">> := <<"infinity">>}, Create),
?assert(is_running(NewListenerId)),
Update2 = request(put, NewPath, [], Create#{<<"max_connections">> => 100}),
?assertMatch(#{<<"max_connections">> := 100}, Update2),
Get2 = request(get, NewPath, [], []),
?assertMatch(#{<<"max_connections">> := 100}, Get2),
Update3 = request(put, NewPath, [], Create#{<<"max_connections">> => <<"123">>}),
?assertMatch(#{<<"max_connections">> := 123}, Update3),
Get3 = request(get, NewPath, [], []),
?assertMatch(#{<<"max_connections">> := 123}, Get3),
%% delete
?assertEqual([], delete(NewPath)),
?assertEqual({error, not_found}, is_running(NewListenerId)),
@ -404,6 +415,62 @@ t_action_listeners(Config) when is_list(Config) ->
action_listener(ID, "start", true),
action_listener(ID, "restart", true).
t_update_validation_error_message({init, Config}) ->
NewListenerId = <<"ssl:new", (integer_to_binary(?LINE))/binary>>,
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
ListenerId = "ssl:default",
OriginalPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
OriginalListener = request(get, OriginalPath, [], []),
[
{new_listener_id, NewListenerId},
{new_path, NewPath},
{original_listener, OriginalListener}
| Config
];
t_update_validation_error_message(Config) when is_list(Config) ->
NewListenerId = ?config(new_listener_id, Config),
NewPath = ?config(new_path, Config),
OriginalListener = ?config(original_listener, Config),
Port = integer_to_binary(?PORT),
NewListener = OriginalListener#{
<<"id">> := NewListenerId,
<<"bind">> => <<"0.0.0.0:", Port/binary>>
},
CreateResp = request(post, NewPath, [], NewListener),
?assertEqual(lists:sort(maps:keys(OriginalListener)), lists:sort(maps:keys(CreateResp))),
%% check that a validation error is user-friendly
WrongConf1a = emqx_utils_maps:deep_put(
[<<"ssl_options">>, <<"enable_crl_check">>],
CreateResp,
true
),
WrongConf1 = emqx_utils_maps:deep_put(
[<<"ssl_options">>, <<"verify">>],
WrongConf1a,
<<"verify_none">>
),
Result1 = request(put, NewPath, [], WrongConf1, #{return_all => true}),
?assertMatch({error, {{_, 400, _}, _Headers, _Body}}, Result1),
{error, {{_, _Code, _}, _Headers, Body1}} = Result1,
#{<<"message">> := RawMsg1} = emqx_utils_json:decode(Body1, [return_maps]),
Msg1 = emqx_utils_json:decode(RawMsg1, [return_maps]),
%% No confusing union type errors.
?assertNotMatch(#{<<"mismatches">> := _}, Msg1),
?assertMatch(
#{
<<"kind">> := <<"validation_error">>,
<<"reason">> := <<"verify must be verify_peer when CRL check is enabled">>,
<<"value">> := #{}
},
Msg1
),
ok;
t_update_validation_error_message({'end', Config}) ->
NewPath = ?config(new_path, Config),
?assertEqual([], delete(NewPath)),
ok.
action_listener(ID, Action, Running) ->
Path = emqx_mgmt_api_test_util:api_path(["listeners", ID, Action]),
{ok, _} = emqx_mgmt_api_test_util:request_api(post, Path),
@ -413,8 +480,11 @@ action_listener(ID, Action, Running) ->
listener_stats(Listener, Running).
request(Method, Url, QueryParams, Body) ->
request(Method, Url, QueryParams, Body, _Opts = #{}).
request(Method, Url, QueryParams, Body, Opts) ->
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body) of
case emqx_mgmt_api_test_util:request_api(Method, Url, QueryParams, AuthHeader, Body, Opts) of
{ok, Res} -> emqx_utils_json:decode(Res, [return_maps]);
Error -> Error
end.

View File

@ -38,7 +38,7 @@ namespace() -> rule_engine.
tags() ->
[<<"Rule Engine">>].
roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_LOW})}].
roots() -> [{"rule_engine", ?HOCON(?R_REF("rule_engine"), #{importance => ?IMPORTANCE_HIDDEN})}].
fields("rule_engine") ->
rule_engine_settings() ++

View File

@ -23,7 +23,7 @@
namespace() -> "slow_subs".
roots() ->
[{"slow_subs", ?HOCON(?R_REF("slow_subs"), #{importance => ?IMPORTANCE_HIDDEN})}].
[{"slow_subs", ?HOCON(?R_REF("slow_subs"), #{importance => ?IMPORTANCE_LOW})}].
fields("slow_subs") ->
[

View File

@ -624,6 +624,9 @@ is_sensitive_key(<<"security_token">>) -> true;
is_sensitive_key(token) -> true;
is_sensitive_key("token") -> true;
is_sensitive_key(<<"token">>) -> true;
is_sensitive_key(jwt) -> true;
is_sensitive_key("jwt") -> true;
is_sensitive_key(<<"jwt">>) -> true;
is_sensitive_key(_) -> false.
redact(Term) ->

View File

@ -0,0 +1 @@
Hide the broker and move the `broker.shared_subscription_strategy` to `mqtt.shared_subscription_strategy` as it belongs to `mqtt`.

View File

@ -0,0 +1 @@
The listener's authentication and zone related apis have been officially removed in version `5.1.0`.

View File

@ -0,0 +1,5 @@
Fixed multiple issues with the Stomp gateway, including:
- Fixed an issue where `is_superuser` was not working correctly.
- Fixed an issue where the mountpoint was not being removed in message delivery.
- After a message or subscription request fails, the Stomp client should be disconnected
immediately after replying with an ERROR message.

View File

@ -0,0 +1,7 @@
Disallow using multiple TLS versions in the listener config that include tlsv1.3 but exclude tlsv1.2.
Using TLS configuration with such version gap caused connection errors.
Additionally, drop and log TLS options that are incompatible with the selected TLS version(s).
Note: any old listener configuration with the version gap described above will fail to load
after applying this fix and must be manually fixed.

View File

@ -0,0 +1 @@
Improved error messages when a validation error occurs while using the Listeners HTTP API.

View File

@ -0,0 +1,8 @@
Deprecates the `mountpoint` field in `AuthenticateRequest` in ExProto gateway.
This field was introduced in v4.x, but in fact, in 5.0 we have provided
`gateway.exproto.mountpoint` for configuration, so there is no need to override
it through the Authenticate request.
Additionally, updates the default value of `subscriptions_max`, `inflight_max`,
`mqueue_max` to `infinity`

View File

@ -0,0 +1 @@
Fix crash on `/api/listeners` when listener's max_connections is set to a string.

View File

@ -0,0 +1 @@
Improve log security for JWT, now it will be obfuscated before print.

View File

@ -0,0 +1 @@
Added a small improvement to reduce the chance of seeing the `connecting` state when creating/updating a Pulsar Producer bridge.

View File

@ -0,0 +1,3 @@
Fix QUIC listeners's default cert file paths.
Prior to this change, the default cert file paths are prefixed with environment variable `${EMQX_ETC_DIR}` which were not interpolated before used in QUIC listeners.

View File

@ -0,0 +1 @@
Fixed a `case_clause` error that could arise in race conditions in Pulsar Producer bridge.

View File

@ -0,0 +1 @@
Fixed credential validation when creating bridge and checking status for InfluxDB Bridges.

View File

@ -0,0 +1 @@
Fixed a health check issue for Pulsar Producer that could lead to loss of messages when the connection to Pulsar's brokers were down.

View File

@ -0,0 +1 @@
Fixed a health check issue for Kafka Producer that could lead to loss of messages when the connection to Kafka's brokers were down.

View File

@ -52,7 +52,7 @@ defmodule EMQXUmbrella.MixProject do
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.10", override: true},
{:gproc, github: "emqx/gproc", tag: "0.9.0.1", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.2", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.6", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-11", override: true},
{:ekka, github: "emqx/ekka", tag: "0.15.2", override: true},
@ -72,7 +72,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.8", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.7", override: true},
{:hocon, github: "emqx/hocon", tag: "0.39.8", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},
@ -92,7 +92,7 @@ defmodule EMQXUmbrella.MixProject do
github: "ninenines/cowlib", ref: "c6553f8308a2ca5dcd69d845f0a7d098c40c3363", override: true},
# in conflict by cowboy_swagger and cowboy
{:ranch,
github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
github: "emqx/ranch", ref: "de8ba2a00817c0a6eb1b8f20d6fb3e44e2c9a5aa", override: true},
# in conflict by grpc and eetcd
{:gpb, "4.19.7", override: true, runtime: false},
{:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
@ -193,7 +193,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.9", override: true},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.10", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.7.5"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0"},

View File

@ -59,7 +59,7 @@
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.10"}}}
, {gproc, {git, "https://github.com/emqx/gproc", {tag, "0.9.0.1"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.6"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-11"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.15.2"}}}
@ -75,7 +75,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.8"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.7"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.39.8"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

View File

@ -23,7 +23,7 @@ main(_) ->
merge(BaseConf, Cfgs) ->
Confs = [BaseConf | lists:map(fun read_conf/1, Cfgs)],
infix(lists:filter(fun(I) -> iolist_size(I) > 0 end, Confs), [io_lib:nl(), io_lib:nl()]).
infix(lists:filter(fun(I) -> iolist_size(I) > 0 end, Confs), [io_lib:nl()]).
read_conf(CfgFile) ->
case filelib:is_regular(CfgFile) of