Merge branch 'master' into EMQX-782
This commit is contained in:
commit
0312f07b11
|
@ -50,5 +50,11 @@ _upgrade_base/
|
|||
TAGS
|
||||
erlang_ls.config
|
||||
.els_cache/
|
||||
# VSCode files
|
||||
.vs/
|
||||
.vscode/
|
||||
# Emacs Backup files
|
||||
*~
|
||||
# Emacs temporary files
|
||||
.#*
|
||||
*#
|
||||
|
|
2
Makefile
2
Makefile
|
@ -5,7 +5,7 @@ BUILD = $(CURDIR)/build
|
|||
SCRIPTS = $(CURDIR)/scripts
|
||||
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
|
||||
export EMQX_DESC ?= EMQ X
|
||||
export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.16
|
||||
export EMQX_DASHBOARD_VERSION ?= v5.0.0-beta.17
|
||||
ifeq ($(OS),Windows_NT)
|
||||
export REBAR_COLOR=none
|
||||
endif
|
||||
|
|
|
@ -833,6 +833,39 @@ force_shutdown {
|
|||
max_heap_size = 32MB
|
||||
}
|
||||
|
||||
overload_protection {
|
||||
## React on system overload or not
|
||||
## @doc overload_protection.enable
|
||||
## ValueType: Boolean
|
||||
## Default: false
|
||||
enable = false
|
||||
|
||||
## Backoff delay in ms
|
||||
## @doc overload_protection.backoff_delay
|
||||
## ValueType: Integer
|
||||
## Range: (0, infinity)
|
||||
## Default: 1
|
||||
backoff_delay = 1
|
||||
|
||||
## Backoff GC enabled
|
||||
## @doc overload_protection.backoff_gc
|
||||
## ValueType: Boolean
|
||||
## Default: false
|
||||
backoff_gc = false
|
||||
|
||||
## Backoff hibernation enabled
|
||||
## @doc overload_protection.backoff_hibernation
|
||||
## ValueType: Boolean
|
||||
## Default: true
|
||||
backoff_hibernation = true
|
||||
|
||||
## Backoff hibernation enabled
|
||||
## @doc overload_protection.backoff_hibernation
|
||||
## ValueType: Boolean
|
||||
## Default: true
|
||||
backoff_new_conn = true
|
||||
}
|
||||
|
||||
force_gc {
|
||||
## Force the MQTT connection process GC after this number of
|
||||
## messages or bytes passed through.
|
||||
|
|
|
@ -9,27 +9,28 @@
|
|||
%% This rebar.config is necessary because the app may be used as a
|
||||
%% `git_subdir` dependency in other projects.
|
||||
{deps,
|
||||
[ {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||
[ {lc, {git, "https://github.com/qzhuyan/lc.git", {tag, "0.1.1"}}}
|
||||
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
|
||||
, {typerefl, {git, "https://github.com/k32/typerefl", {tag, "0.8.5"}}}
|
||||
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
|
||||
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.3"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.3"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
|
||||
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.0"}}}
|
||||
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.9"}}}
|
||||
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
|
||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.5"}}}
|
||||
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.19.6"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
, {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
|
||||
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.14.1"}}}
|
||||
]}.
|
||||
|
||||
{plugins, [rebar3_proper]}.
|
||||
{plugins, [{rebar3_proper, "0.12.1"}]}.
|
||||
{extra_src_dirs, [{"etc", [recursive]}]}.
|
||||
{profiles, [
|
||||
{test,
|
||||
[{deps,
|
||||
[ meck
|
||||
[ {meck, "0.9.2"}
|
||||
, {proper, "1.4.0"}
|
||||
, {bbmustache,"1.10.0"}
|
||||
, {emqx_ct_helpers, {git,"https://github.com/emqx/emqx-ct-helpers.git", {tag,"2.1.0"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.4.3"}}}
|
||||
]},
|
||||
{extra_src_dirs, [{"test",[recursive]}]}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
{vsn, "5.0.0"}, % strict semver, bump manually!
|
||||
{modules, []},
|
||||
{registered, []},
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy]},
|
||||
{applications, [kernel,stdlib,gproc,gen_rpc,esockd,cowboy,sasl,os_mon,jiffy,lc]},
|
||||
{mod, {emqx_app,[]}},
|
||||
{env, []},
|
||||
{licenses, ["Apache-2.0"]},
|
||||
|
|
|
@ -81,7 +81,7 @@ set_debug_secret(PathToSecretFile) ->
|
|||
catch _ : _ -> error({badfile, PathToSecretFile})
|
||||
end;
|
||||
{error, Reason} ->
|
||||
?ULOG("Failed to read debug_info encryption key file ~s: ~p~n",
|
||||
?ULOG("Failed to read debug_info encryption key file ~ts: ~p~n",
|
||||
[PathToSecretFile, Reason]),
|
||||
error(Reason)
|
||||
end,
|
||||
|
|
|
@ -410,21 +410,23 @@ normalize(#deactivated_alarm{activate_at = ActivateAt,
|
|||
|
||||
normalize_message(Name, no_details) ->
|
||||
list_to_binary(io_lib:format("~p", [Name]));
|
||||
normalize_message(runq_overload, #{node := Node, runq_length := Len}) ->
|
||||
list_to_binary(io_lib:format("VM is overloaded on node: ~p: ~p", [Node, Len]));
|
||||
normalize_message(high_system_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||
list_to_binary(io_lib:format("System memory usage is higher than ~p%", [HighWatermark]));
|
||||
normalize_message(high_process_memory_usage, #{high_watermark := HighWatermark}) ->
|
||||
list_to_binary(io_lib:format("Process memory usage is higher than ~p%", [HighWatermark]));
|
||||
normalize_message(high_cpu_usage, #{usage := Usage}) ->
|
||||
list_to_binary(io_lib:format("~s cpu usage", [Usage]));
|
||||
list_to_binary(io_lib:format("~ts cpu usage", [Usage]));
|
||||
normalize_message(too_many_processes, #{usage := Usage}) ->
|
||||
list_to_binary(io_lib:format("~s process usage", [Usage]));
|
||||
list_to_binary(io_lib:format("~ts process usage", [Usage]));
|
||||
normalize_message(cluster_rpc_apply_failed, #{tnx_id := TnxId}) ->
|
||||
list_to_binary(io_lib:format("cluster_rpc_apply_failed:~w", [TnxId]));
|
||||
normalize_message(partition, #{occurred := Node}) ->
|
||||
list_to_binary(io_lib:format("Partition occurs at node ~s", [Node]));
|
||||
list_to_binary(io_lib:format("Partition occurs at node ~ts", [Node]));
|
||||
normalize_message(<<"resource", _/binary>>, #{type := Type, id := ID}) ->
|
||||
list_to_binary(io_lib:format("Resource ~s(~s) is down", [Type, ID]));
|
||||
list_to_binary(io_lib:format("Resource ~ts(~ts) is down", [Type, ID]));
|
||||
normalize_message(<<"conn_congestion/", Info/binary>>, _) ->
|
||||
list_to_binary(io_lib:format("connection congested: ~s", [Info]));
|
||||
list_to_binary(io_lib:format("connection congested: ~ts", [Info]));
|
||||
normalize_message(_Name, _UnknownDetails) ->
|
||||
<<"Unknown alarm">>.
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
-include("emqx.hrl").
|
||||
-include("logger.hrl").
|
||||
-include_lib("lc/include/lc.hrl").
|
||||
|
||||
|
||||
%% gen_event callbacks
|
||||
|
@ -74,6 +75,14 @@ handle_event({clear_alarm, process_memory_high_watermark}, State) ->
|
|||
emqx_alarm:deactivate(high_process_memory_usage),
|
||||
{ok, State};
|
||||
|
||||
handle_event({set_alarm, {?LC_ALARM_ID_RUNQ, Info}}, State) ->
|
||||
emqx_alarm:activate(runq_overload, Info),
|
||||
{ok, State};
|
||||
|
||||
handle_event({clear_alarm, ?LC_ALARM_ID_RUNQ}, State) ->
|
||||
emqx_alarm:deactivate(runq_overload),
|
||||
{ok, State};
|
||||
|
||||
handle_event(_, State) ->
|
||||
{ok, State}.
|
||||
|
||||
|
|
|
@ -57,8 +57,8 @@ run(#{subscribers := Subs,
|
|||
lists:foreach(fun(Pid) -> Pid ! start_subscribe end, SubsPids),
|
||||
collect_results(SubsPids, subscribe_time)
|
||||
end),
|
||||
io:format(user, "InsertTotalTime: ~s~n", [ns(T1)]),
|
||||
io:format(user, "InsertTimeAverage: ~s~n", [ns(SubsTime / Subs)]),
|
||||
io:format(user, "InsertTotalTime: ~ts~n", [ns(T1)]),
|
||||
io:format(user, "InsertTimeAverage: ~ts~n", [ns(SubsTime / Subs)]),
|
||||
io:format(user, "InsertRps: ~p~n", [rps(Subs * SubOps, T1)]),
|
||||
|
||||
io:format(user, "lookup ...~n", []),
|
||||
|
@ -67,8 +67,8 @@ run(#{subscribers := Subs,
|
|||
lists:foreach(fun(Pid) -> Pid ! start_lookup end, PubsPids),
|
||||
collect_results(PubsPids, lookup_time)
|
||||
end),
|
||||
io:format(user, "LookupTotalTime: ~s~n", [ns(T2)]),
|
||||
io:format(user, "LookupTimeAverage: ~s~n", [ns(PubsTime / Pubs)]),
|
||||
io:format(user, "LookupTotalTime: ~ts~n", [ns(T2)]),
|
||||
io:format(user, "LookupTimeAverage: ~ts~n", [ns(PubsTime / Pubs)]),
|
||||
io:format(user, "LookupRps: ~p~n", [rps(Pubs * PubOps, T2)]),
|
||||
|
||||
io:format(user, "mnesia table(s) RAM: ~p~n", [ram_bytes()]),
|
||||
|
@ -79,7 +79,7 @@ run(#{subscribers := Subs,
|
|||
lists:foreach(fun(Pid) -> Pid ! stop end, SubsPids),
|
||||
wait_until_empty()
|
||||
end),
|
||||
io:format(user, "TimeToUnsubscribeAll: ~s~n", [ns(T3)]).
|
||||
io:format(user, "TimeToUnsubscribeAll: ~ts~n", [ns(T3)]).
|
||||
|
||||
wait_until_empty() ->
|
||||
case emqx_trie:empty() of
|
||||
|
|
|
@ -291,7 +291,8 @@ handle_in(?CONNECT_PACKET(), Channel = #channel{conn_state = connecting}) ->
|
|||
handle_out(connack, ?RC_PROTOCOL_ERROR, Channel);
|
||||
|
||||
handle_in(?CONNECT_PACKET(ConnPkt), Channel) ->
|
||||
case pipeline([fun enrich_conninfo/2,
|
||||
case pipeline([fun overload_protection/2,
|
||||
fun enrich_conninfo/2,
|
||||
fun run_conn_hooks/2,
|
||||
fun check_connect/2,
|
||||
fun enrich_client/2,
|
||||
|
@ -1158,6 +1159,9 @@ run_terminate_hook(Reason, #channel{clientinfo = ClientInfo, session = Session})
|
|||
%%--------------------------------------------------------------------
|
||||
%% Internal functions
|
||||
%%--------------------------------------------------------------------
|
||||
overload_protection(_, #channel{clientinfo = #{zone := Zone}}) ->
|
||||
emqx_olp:backoff(Zone),
|
||||
ok.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Enrich MQTT Connect Info
|
||||
|
|
|
@ -41,13 +41,6 @@
|
|||
-define(MOD, {mod}).
|
||||
-define(WKEY, '?').
|
||||
|
||||
-define(ATOM_CONF_PATH(PATH, EXP, EXP_ON_FAIL),
|
||||
try [safe_atom(Key) || Key <- PATH] of
|
||||
AtomKeyPath -> EXP
|
||||
catch
|
||||
error:badarg -> EXP_ON_FAIL
|
||||
end).
|
||||
|
||||
-type handler_name() :: module().
|
||||
-type handlers() :: #{emqx_config:config_key() => handlers(), ?MOD => handler_name()}.
|
||||
|
||||
|
@ -76,8 +69,9 @@ stop() ->
|
|||
-spec update_config(module(), emqx_config:config_key_path(), emqx_config:update_args()) ->
|
||||
{ok, emqx_config:update_result()} | {error, emqx_config:update_error()}.
|
||||
update_config(SchemaModule, ConfKeyPath, UpdateArgs) ->
|
||||
?ATOM_CONF_PATH(ConfKeyPath, gen_server:call(?MODULE, {change_config, SchemaModule,
|
||||
AtomKeyPath, UpdateArgs}), {error, {not_found, ConfKeyPath}}).
|
||||
%% force covert the path to a list of atoms, as there maybe some wildcard names/ids in the path
|
||||
AtomKeyPath = [atom(Key) || Key <- ConfKeyPath],
|
||||
gen_server:call(?MODULE, {change_config, SchemaModule, AtomKeyPath, UpdateArgs}).
|
||||
|
||||
-spec add_handler(emqx_config:config_key_path(), handler_name()) -> ok.
|
||||
add_handler(ConfKeyPath, HandlerName) ->
|
||||
|
@ -310,9 +304,9 @@ bin_path(ConfKeyPath) -> [bin(Key) || Key <- ConfKeyPath].
|
|||
bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
|
||||
bin(B) when is_binary(B) -> B.
|
||||
|
||||
safe_atom(Bin) when is_binary(Bin) ->
|
||||
binary_to_existing_atom(Bin, latin1);
|
||||
safe_atom(Str) when is_list(Str) ->
|
||||
list_to_existing_atom(Str);
|
||||
safe_atom(Atom) when is_atom(Atom) ->
|
||||
atom(Bin) when is_binary(Bin) ->
|
||||
binary_to_atom(Bin, utf8);
|
||||
atom(Str) when is_list(Str) ->
|
||||
list_to_atom(Str);
|
||||
atom(Atom) when is_atom(Atom) ->
|
||||
Atom.
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
|
||||
-define(ALARM_CONN_CONGEST(Channel, Reason),
|
||||
list_to_binary(
|
||||
io_lib:format("~s/~s/~s",
|
||||
io_lib:format("~ts/~ts/~ts",
|
||||
[Reason, emqx_channel:info(clientid, Channel),
|
||||
maps:get(username, emqx_channel:info(clientinfo, Channel),
|
||||
<<"unknown_user">>)]))).
|
||||
|
|
|
@ -116,7 +116,7 @@
|
|||
-define(ENABLED(X), (X =/= undefined)).
|
||||
|
||||
-define(ALARM_TCP_CONGEST(Channel),
|
||||
list_to_binary(io_lib:format("mqtt_conn/congested/~s/~s",
|
||||
list_to_binary(io_lib:format("mqtt_conn/congested/~ts/~ts",
|
||||
[emqx_channel:info(clientid, Channel),
|
||||
emqx_channel:info(username, Channel)]))).
|
||||
|
||||
|
@ -317,13 +317,20 @@ exit_on_sock_error(Reason) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Recv Loop
|
||||
|
||||
recvloop(Parent, State = #state{idle_timeout = IdleTimeout}) ->
|
||||
recvloop(Parent, State = #state{ idle_timeout = IdleTimeout
|
||||
, zone = Zone
|
||||
}) ->
|
||||
receive
|
||||
Msg ->
|
||||
handle_recv(Msg, Parent, State)
|
||||
after
|
||||
IdleTimeout + 100 ->
|
||||
case emqx_olp:backoff_hibernation(Zone) of
|
||||
true ->
|
||||
recvloop(Parent, State);
|
||||
false ->
|
||||
hibernate(Parent, cancel_stats_timer(State))
|
||||
end
|
||||
end.
|
||||
|
||||
handle_recv({system, From, Request}, Parent, State) ->
|
||||
|
@ -822,8 +829,10 @@ ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
|
|||
%%--------------------------------------------------------------------
|
||||
%% Run GC and Check OOM
|
||||
|
||||
run_gc(Stats, State = #state{gc_state = GcSt}) ->
|
||||
case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
|
||||
run_gc(Stats, State = #state{gc_state = GcSt, zone = Zone}) ->
|
||||
case ?ENABLED(GcSt) andalso not emqx_olp:backoff_gc(Zone)
|
||||
andalso emqx_gc:run(Stats, GcSt)
|
||||
of
|
||||
false -> State;
|
||||
{_IsGC, GcSt1} ->
|
||||
State#state{gc_state = GcSt1}
|
||||
|
|
|
@ -128,7 +128,7 @@ help() ->
|
|||
[] ->
|
||||
print("No commands available.~n");
|
||||
Cmds ->
|
||||
print("Usage: ~s~n", [?MODULE]),
|
||||
print("Usage: ~ts~n", [?MODULE]),
|
||||
lists:foreach(fun({_, {Mod, Cmd}, _}) ->
|
||||
print("~110..-s~n", [""]), Mod:Cmd(usage)
|
||||
end, Cmds)
|
||||
|
@ -136,11 +136,11 @@ help() ->
|
|||
|
||||
-spec(print(io:format()) -> ok).
|
||||
print(Msg) ->
|
||||
io:format("~s", [format(Msg)]).
|
||||
io:format("~ts", [format(Msg)]).
|
||||
|
||||
-spec(print(io:format(), [term()]) -> ok).
|
||||
print(Format, Args) ->
|
||||
io:format("~s", [format(Format, Args)]).
|
||||
io:format("~ts", [format(Format, Args)]).
|
||||
|
||||
-spec(usage([cmd_usage()]) -> ok).
|
||||
usage(UsageList) ->
|
||||
|
@ -152,7 +152,7 @@ usage(CmdParams, Desc) ->
|
|||
|
||||
-spec(format(io:format()) -> string()).
|
||||
format(Msg) ->
|
||||
lists:flatten(io_lib:format("~s", [Msg])).
|
||||
lists:flatten(io_lib:format("~ts", [Msg])).
|
||||
|
||||
-spec(format(io:format(), [term()]) -> string()).
|
||||
format(Format, Args) ->
|
||||
|
@ -170,7 +170,7 @@ format_usage(CmdParams, Desc) ->
|
|||
CmdLines = split_cmd(CmdParams),
|
||||
DescLines = split_cmd(Desc),
|
||||
lists:foldl(fun({CmdStr, DescStr}, Usage) ->
|
||||
Usage ++ format("~-70s# ~s~n", [CmdStr, DescStr])
|
||||
Usage ++ format("~-70s# ~ts~n", [CmdStr, DescStr])
|
||||
end, "", zip_cmd(CmdLines, DescLines)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -131,15 +131,15 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
|||
case do_start_listener(Type, ListenerName, Conf) of
|
||||
{ok, {skipped, Reason}} when Reason =:= listener_disabled;
|
||||
Reason =:= quic_app_missing ->
|
||||
console_print("- Skip - starting listener ~s on ~s ~n due to ~p",
|
||||
console_print("- Skip - starting listener ~ts on ~ts ~n due to ~p",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]);
|
||||
{ok, _} ->
|
||||
console_print("Listener ~s on ~s started.~n",
|
||||
console_print("Listener ~ts on ~ts started.~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind)]);
|
||||
{error, {already_started, Pid}} ->
|
||||
{error, {already_started, Pid}};
|
||||
{error, Reason} ->
|
||||
?ELOG("Failed to start listener ~s on ~s: ~0p~n",
|
||||
?ELOG("Failed to start listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]),
|
||||
error(Reason)
|
||||
end.
|
||||
|
@ -180,11 +180,11 @@ stop_listener(ListenerId) ->
|
|||
stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
|
||||
case do_stop_listener(Type, ListenerName, Conf) of
|
||||
ok ->
|
||||
console_print("Listener ~s on ~s stopped.~n",
|
||||
console_print("Listener ~ts on ~ts stopped.~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind)]),
|
||||
ok;
|
||||
{error, Reason} ->
|
||||
?ELOG("Failed to stop listener ~s on ~s: ~0p~n",
|
||||
?ELOG("Failed to stop listener ~ts on ~ts: ~0p~n",
|
||||
[listener_id(Type, ListenerName), format_addr(Bind), Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
@ -289,7 +289,9 @@ esockd_opts(Type, Opts0) ->
|
|||
infinity -> Opts1;
|
||||
Rate -> Opts1#{max_conn_rate => Rate}
|
||||
end,
|
||||
Opts3 = Opts2#{access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))},
|
||||
Opts3 = Opts2#{ access_rules => esockd_access_rules(maps:get(access_rules, Opts0, []))
|
||||
, tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
|
||||
},
|
||||
maps:to_list(case Type of
|
||||
tcp -> Opts3#{tcp_options => tcp_opts(Opts0)};
|
||||
ssl -> Opts3#{ssl_options => ssl_opts(Opts0), tcp_options => tcp_opts(Opts0)}
|
||||
|
@ -342,9 +344,9 @@ merge_default(Options) ->
|
|||
format_addr(Port) when is_integer(Port) ->
|
||||
io_lib:format("0.0.0.0:~w", [Port]);
|
||||
format_addr({Addr, Port}) when is_list(Addr) ->
|
||||
io_lib:format("~s:~w", [Addr, Port]);
|
||||
io_lib:format("~ts:~w", [Addr, Port]);
|
||||
format_addr({Addr, Port}) when is_tuple(Addr) ->
|
||||
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).
|
||||
io_lib:format("~ts:~w", [inet:ntoa(Addr), Port]).
|
||||
|
||||
listener_id(Type, ListenerName) ->
|
||||
list_to_atom(lists:append([str(Type), ":", str(ListenerName)])).
|
||||
|
|
|
@ -32,7 +32,7 @@ enrich(Report, #{mfa := Mfa, line := Line}) ->
|
|||
enrich(Report, _) -> Report.
|
||||
|
||||
enrich_fmt({Fmt, Args}, #{mfa := Mfa, line := Line}) when is_list(Fmt) ->
|
||||
{Fmt ++ " mfa: ~s line: ~w", Args ++ [mfa(Mfa), Line]};
|
||||
{Fmt ++ " mfa: ~ts line: ~w", Args ++ [mfa(Mfa), Line]};
|
||||
enrich_fmt(Msg, _) ->
|
||||
Msg.
|
||||
|
||||
|
|
|
@ -184,6 +184,15 @@
|
|||
{counter, 'session.terminated'}
|
||||
]).
|
||||
|
||||
%% Overload protetion counters
|
||||
-define(OLP_METRICS,
|
||||
[{counter, 'olp.delay.ok'},
|
||||
{counter, 'olp.delay.timeout'},
|
||||
{counter, 'olp.hbn'},
|
||||
{counter, 'olp.gc'},
|
||||
{counter, 'olp.new_conn'}
|
||||
]).
|
||||
|
||||
-record(state, {next_idx = 1}).
|
||||
|
||||
-record(metric, {name, type, idx}).
|
||||
|
@ -430,7 +439,8 @@ init([]) ->
|
|||
?MESSAGE_METRICS,
|
||||
?DELIVERY_METRICS,
|
||||
?CLIENT_METRICS,
|
||||
?SESSION_METRICS
|
||||
?SESSION_METRICS,
|
||||
?OLP_METRICS
|
||||
]),
|
||||
% Store reserved indices
|
||||
ok = lists:foreach(fun({Type, Name}) ->
|
||||
|
@ -575,5 +585,11 @@ reserved_idx('session.takeovered') -> 222;
|
|||
reserved_idx('session.discarded') -> 223;
|
||||
reserved_idx('session.terminated') -> 224;
|
||||
|
||||
reserved_idx('olp.delay.ok') -> 300;
|
||||
reserved_idx('olp.delay.timeout') -> 301;
|
||||
reserved_idx('olp.hbn') -> 302;
|
||||
reserved_idx('olp.gc') -> 303;
|
||||
reserved_idx('olp.new_conn') -> 304;
|
||||
|
||||
reserved_idx(_) -> undefined.
|
||||
|
||||
|
|
|
@ -0,0 +1,136 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_olp).
|
||||
|
||||
-include_lib("lc/include/lc.hrl").
|
||||
|
||||
-export([ is_overloaded/0
|
||||
, backoff/1
|
||||
, backoff_gc/1
|
||||
, backoff_hibernation/1
|
||||
, backoff_new_conn/1
|
||||
]).
|
||||
|
||||
|
||||
%% exports for O&M
|
||||
-export([ status/0
|
||||
, enable/0
|
||||
, disable/0
|
||||
]).
|
||||
|
||||
-type cfg_key() ::
|
||||
backoff_gc |
|
||||
backoff_hibernation |
|
||||
backoff_new_conn.
|
||||
|
||||
-type cnt_name() ::
|
||||
'olp.delay.ok' |
|
||||
'olp.delay.timeout' |
|
||||
'olp.hbn' |
|
||||
'olp.gc' |
|
||||
'olp.new_conn'.
|
||||
|
||||
-define(overload_protection, overload_protection).
|
||||
|
||||
%% @doc Light realtime check if system is overloaded.
|
||||
-spec is_overloaded() -> boolean().
|
||||
is_overloaded() ->
|
||||
load_ctl:is_overloaded().
|
||||
|
||||
%% @doc Backoff with a delay if the system is overloaded, for tasks that could be deferred.
|
||||
%% returns `false' if backoff didn't happen, the system is cool.
|
||||
%% returns `ok' if backoff is triggered and get unblocked when the system is cool.
|
||||
%% returns `timeout' if backoff is trigged but get unblocked due to timeout as configured.
|
||||
-spec backoff(Zone :: atom()) -> ok | false | timeout.
|
||||
backoff(Zone) ->
|
||||
case emqx_config:get_zone_conf(Zone, [?overload_protection]) of
|
||||
#{enable := true, backoff_delay := Delay} ->
|
||||
case load_ctl:maydelay(Delay) of
|
||||
false -> false;
|
||||
ok ->
|
||||
emqx_metrics:inc('olp.delay.ok'),
|
||||
ok;
|
||||
timeout ->
|
||||
emqx_metrics:inc('olp.delay.timeout'),
|
||||
timeout
|
||||
end;
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
%% @doc If forceful GC should be skipped when the system is overloaded.
|
||||
-spec backoff_gc(Zone :: atom()) -> boolean().
|
||||
backoff_gc(Zone) ->
|
||||
do_check(Zone, ?FUNCTION_NAME, 'olp.gc').
|
||||
|
||||
%% @doc If hibernation should be skipped when the system is overloaded.
|
||||
-spec backoff_hibernation(Zone :: atom()) -> boolean().
|
||||
backoff_hibernation(Zone) ->
|
||||
do_check(Zone, ?FUNCTION_NAME, 'olp.hbn').
|
||||
|
||||
%% @doc Returns {error, overloaded} if new connection should be
|
||||
%% closed when system is overloaded.
|
||||
-spec backoff_new_conn(Zone :: atom()) -> ok | {error, overloaded}.
|
||||
backoff_new_conn(Zone) ->
|
||||
case do_check(Zone, ?FUNCTION_NAME, 'olp.new_conn') of
|
||||
true ->
|
||||
{error, overloaded};
|
||||
false ->
|
||||
ok
|
||||
end.
|
||||
|
||||
-spec status() -> any().
|
||||
status() ->
|
||||
is_overloaded().
|
||||
|
||||
%% @doc turn off backgroud runq check.
|
||||
-spec disable() -> ok | {error, timeout}.
|
||||
disable() ->
|
||||
load_ctl:stop_runq_flagman(5000).
|
||||
|
||||
%% @doc turn on backgroud runq check.
|
||||
-spec enable() -> {ok, pid()} | {error, running | restarting | disabled}.
|
||||
enable() ->
|
||||
case load_ctl:restart_runq_flagman() of
|
||||
{error, disabled} ->
|
||||
OldCfg = load_ctl:get_config(),
|
||||
ok = load_ctl:put_config(OldCfg#{ ?RUNQ_MON_F0 => true }),
|
||||
load_ctl:restart_runq_flagman();
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
||||
%%% Internals
|
||||
-spec do_check(Zone::atom(), cfg_key(), cnt_name()) -> boolean().
|
||||
do_check(Zone, Key, CntName) ->
|
||||
case load_ctl:is_overloaded() of
|
||||
true ->
|
||||
case emqx_config:get_zone_conf(Zone, [?overload_protection]) of
|
||||
#{enable := true, Key := true} ->
|
||||
emqx_metrics:inc(CntName),
|
||||
true;
|
||||
_ ->
|
||||
false
|
||||
end;
|
||||
false -> false
|
||||
end.
|
||||
|
||||
|
||||
%%%_* Emacs ====================================================================
|
||||
%%% Local Variables:
|
||||
%%% allout-layout: t
|
||||
%%% erlang-indent-level: 2
|
||||
%%% End:
|
|
@ -446,14 +446,14 @@ format_header(#mqtt_packet_header{type = Type,
|
|||
true -> <<>>;
|
||||
false -> [", ", S]
|
||||
end,
|
||||
io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
|
||||
io_lib:format("~ts(Q~p, R~p, D~p~ts)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
|
||||
|
||||
format_variable(undefined, _) ->
|
||||
undefined;
|
||||
format_variable(Variable, undefined) ->
|
||||
format_variable(Variable);
|
||||
format_variable(Variable, Payload) ->
|
||||
io_lib:format("~s, Payload=~0p", [format_variable(Variable), Payload]).
|
||||
io_lib:format("~ts, Payload=~0p", [format_variable(Variable), Payload]).
|
||||
|
||||
format_variable(#mqtt_packet_connect{
|
||||
proto_ver = ProtoVer,
|
||||
|
@ -468,10 +468,10 @@ format_variable(#mqtt_packet_connect{
|
|||
will_payload = WillPayload,
|
||||
username = Username,
|
||||
password = Password}) ->
|
||||
Format = "ClientId=~s, ProtoName=~s, ProtoVsn=~p, CleanStart=~s, KeepAlive=~p, Username=~s, Password=~s",
|
||||
Format = "ClientId=~ts, ProtoName=~ts, ProtoVsn=~p, CleanStart=~ts, KeepAlive=~p, Username=~ts, Password=~ts",
|
||||
Args = [ClientId, ProtoName, ProtoVer, CleanStart, KeepAlive, Username, format_password(Password)],
|
||||
{Format1, Args1} = if
|
||||
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~s, Payload=~0p)",
|
||||
WillFlag -> {Format ++ ", Will(Q~p, R~p, Topic=~ts, Payload=~0p)",
|
||||
Args ++ [WillQoS, i(WillRetain), WillTopic, WillPayload]};
|
||||
true -> {Format, Args}
|
||||
end,
|
||||
|
@ -487,7 +487,7 @@ format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
|
|||
|
||||
format_variable(#mqtt_packet_publish{topic_name = TopicName,
|
||||
packet_id = PacketId}) ->
|
||||
io_lib:format("Topic=~s, PacketId=~p", [TopicName, PacketId]);
|
||||
io_lib:format("Topic=~ts, PacketId=~p", [TopicName, PacketId]);
|
||||
|
||||
format_variable(#mqtt_packet_puback{packet_id = PacketId,
|
||||
reason_code = ReasonCode}) ->
|
||||
|
|
|
@ -151,7 +151,7 @@ load_ext_plugin(PluginDir) ->
|
|||
% catch
|
||||
% throw : {conf_file_not_found, ConfFile} ->
|
||||
% %% this is maybe a dependency of an external plugin
|
||||
% ?LOG(debug, "config_load_error_ignored for app=~p, path=~s", [AppName, ConfFile]),
|
||||
% ?LOG(debug, "config_load_error_ignored for app=~p, path=~ts", [AppName, ConfFile]),
|
||||
% ok
|
||||
% end.
|
||||
|
||||
|
|
|
@ -35,6 +35,8 @@ init(ConnOpts) when is_map(ConnOpts) ->
|
|||
-spec new_conn(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
|
||||
new_conn(Conn, S) ->
|
||||
process_flag(trap_exit, true),
|
||||
case emqx_olp:is_overloaded() of
|
||||
false ->
|
||||
{ok, Pid} = emqx_connection:start_link(emqx_quic_stream, {self(), Conn}, S),
|
||||
receive
|
||||
{Pid, stream_acceptor_ready} ->
|
||||
|
@ -42,6 +44,10 @@ new_conn(Conn, S) ->
|
|||
{ok, S};
|
||||
{'EXIT', Pid, _Reason} ->
|
||||
{error, stream_accept_error}
|
||||
end;
|
||||
true ->
|
||||
emqx_metrics:inc('olp.new_conn'),
|
||||
{error, overloaded}
|
||||
end.
|
||||
|
||||
-spec connected(quicer:connection_handler(), cb_state()) -> {ok, cb_state()} | {error, any()}.
|
||||
|
|
|
@ -177,7 +177,7 @@ topics() ->
|
|||
-spec(print_routes(emqx_types:topic()) -> ok).
|
||||
print_routes(Topic) ->
|
||||
lists:foreach(fun(#route{topic = To, dest = Dest}) ->
|
||||
io:format("~s -> ~s~n", [To, Dest])
|
||||
io:format("~ts -> ~ts~n", [To, Dest])
|
||||
end, match_routes(Topic)).
|
||||
|
||||
call(Router, Msg) ->
|
||||
|
|
|
@ -72,6 +72,7 @@
|
|||
-export([namespace/0, roots/0, roots/1, fields/1]).
|
||||
-export([conf_get/2, conf_get/3, keys/2, filter/1]).
|
||||
-export([server_ssl_opts_schema/2, client_ssl_opts_schema/1, ciphers_schema/1, default_ciphers/1]).
|
||||
-export([sc/2, map/2]).
|
||||
|
||||
namespace() -> undefined.
|
||||
|
||||
|
@ -122,6 +123,9 @@ roots(medium) ->
|
|||
, {"force_shutdown",
|
||||
sc(ref("force_shutdown"),
|
||||
#{})}
|
||||
, {"overload_protection",
|
||||
sc(ref("overload_protection"),
|
||||
#{})}
|
||||
];
|
||||
roots(low) ->
|
||||
[ {"force_gc",
|
||||
|
@ -323,7 +327,9 @@ fields("mqtt") ->
|
|||
|
||||
fields("zone") ->
|
||||
Fields = ["mqtt", "stats", "flapping_detect", "force_shutdown",
|
||||
"conn_congestion", "rate_limit", "quota", "force_gc"],
|
||||
"conn_congestion", "rate_limit", "quota", "force_gc",
|
||||
"overload_protection"
|
||||
],
|
||||
[{F, ref(emqx_zone_schema, F)} || F <- Fields];
|
||||
|
||||
fields("rate_limit") ->
|
||||
|
@ -391,6 +397,35 @@ fields("force_shutdown") ->
|
|||
})}
|
||||
];
|
||||
|
||||
fields("overload_protection") ->
|
||||
[ {"enable",
|
||||
sc(boolean(),
|
||||
#{ desc => "React on system overload or not"
|
||||
, default => false
|
||||
})}
|
||||
, {"backoff_delay",
|
||||
sc(range(0, inf),
|
||||
#{ desc => "Some unimporant tasks could be delayed"
|
||||
"for execution, here set the delays in ms"
|
||||
, default => 1
|
||||
})}
|
||||
, {"backoff_gc",
|
||||
sc(boolean(),
|
||||
#{ desc => "Skip forceful GC if necessary"
|
||||
, default => false
|
||||
})}
|
||||
, {"backoff_hibernation",
|
||||
sc(boolean(),
|
||||
#{ desc => "Skip process hibernation if necessary"
|
||||
, default => true
|
||||
})}
|
||||
, {"backoff_new_conn",
|
||||
sc(boolean(),
|
||||
#{ desc => "Close new incoming connections if necessary"
|
||||
, default => true
|
||||
})}
|
||||
];
|
||||
|
||||
fields("conn_congestion") ->
|
||||
[ {"enable_alarm",
|
||||
sc(boolean(),
|
||||
|
@ -1280,7 +1315,7 @@ validate_heap_size(Siz) ->
|
|||
(1 bsl 27) - 1
|
||||
end,
|
||||
case Siz > MaxSiz of
|
||||
true -> error(io_lib:format("force_shutdown_policy: heap-size ~s is too large", [Siz]));
|
||||
true -> error(io_lib:format("force_shutdown_policy: heap-size ~ts is too large", [Siz]));
|
||||
false -> ok
|
||||
end.
|
||||
parse_user_lookup_fun(StrConf) ->
|
||||
|
|
|
@ -70,7 +70,7 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload})
|
|||
when is_binary(From); is_atom(From) ->
|
||||
emqx_logger:info(#{topic => Topic,
|
||||
mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} },
|
||||
"PUBLISH to ~s: ~0p", [Topic, Payload]).
|
||||
"PUBLISH to ~ts: ~0p", [Topic, Payload]).
|
||||
|
||||
%% @doc Start to trace clientid or topic.
|
||||
-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}).
|
||||
|
@ -83,8 +83,8 @@ start_trace(Who, Level, LogFile) ->
|
|||
try logger:compare_levels(Level, PrimaryLevel) of
|
||||
lt ->
|
||||
{error,
|
||||
io_lib:format("Cannot trace at a log level (~s) "
|
||||
"lower than the primary log level (~s)",
|
||||
io_lib:format("Cannot trace at a log level (~ts) "
|
||||
"lower than the primary log level (~ts)",
|
||||
[Level, PrimaryLevel])};
|
||||
_GtOrEq ->
|
||||
install_trace_handler(Who, Level, LogFile)
|
||||
|
|
|
@ -33,7 +33,6 @@
|
|||
, process_gc_info_keys/0
|
||||
, get_process_gc_info/0
|
||||
, get_process_gc_info/1
|
||||
, get_process_group_leader_info/1
|
||||
, get_process_limit/0
|
||||
]).
|
||||
|
||||
|
@ -316,9 +315,6 @@ get_process_gc_info() ->
|
|||
get_process_gc_info(Pid) when is_pid(Pid) ->
|
||||
process_info(Pid, ?PROCESS_GC_KEYS).
|
||||
|
||||
get_process_group_leader_info(LeaderPid) when is_pid(LeaderPid) ->
|
||||
[{Key, Value}|| {Key, Value} <- process_info(LeaderPid), lists:member(Key, ?PROCESS_INFO_KEYS)].
|
||||
|
||||
get_process_limit() ->
|
||||
erlang:system_info(process_limit).
|
||||
|
||||
|
|
|
@ -23,14 +23,14 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_emqx_pubsub_api(_) ->
|
||||
true = emqx:is_running(node()),
|
||||
|
|
|
@ -22,15 +22,15 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules([router, broker]),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules([router, broker]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_authenticate(_) ->
|
||||
?assertMatch({ok, _}, emqx_access_control:authenticate(clientinfo())).
|
||||
|
|
|
@ -23,29 +23,29 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(t_size_limit, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
{ok, _} = emqx:update_config([alarm], #{
|
||||
<<"size_limit">> => 2
|
||||
}),
|
||||
Config;
|
||||
init_per_testcase(t_validity_period, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
{ok, _} = emqx:update_config([alarm], #{
|
||||
<<"validity_period">> => <<"1s">>
|
||||
}),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_alarm(_) ->
|
||||
ok = emqx_alarm:activate(unknown_alarm),
|
||||
|
|
|
@ -82,15 +82,15 @@ destroy(_State) ->
|
|||
ok.
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:set_env(ekka, strict_mode, true),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
ok.
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
|
@ -297,7 +297,7 @@ update_config(Path, ConfigRequest) ->
|
|||
emqx:update_config(Path, ConfigRequest, #{rawconf_with_defaults => true}).
|
||||
|
||||
certs(Certs) ->
|
||||
CertsPath = emqx_ct_helpers:deps_path(emqx, "etc/certs"),
|
||||
CertsPath = emqx_common_test_helpers:deps_path(emqx, "etc/certs"),
|
||||
lists:foldl(fun({Key, Filename}, Acc) ->
|
||||
{ok, Bin} = file:read_file(filename:join([CertsPath, Filename])),
|
||||
Acc#{Key => Bin}
|
||||
|
|
|
@ -21,15 +21,15 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_batch_full_commit(_) ->
|
||||
B0 = emqx_batch:init(#{batch_size => 3,
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_is_enabled(_) ->
|
||||
ok = application:set_env(emqx, boot_modules, all),
|
||||
|
|
|
@ -27,15 +27,15 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(Case, Config) ->
|
||||
?MODULE:Case({init, Config}).
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
emqx_broker_helper:start_link(),
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
|
||||
|
||||
all() ->
|
||||
emqx_ct:all(?MODULE).
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
force_gc_conf() ->
|
||||
#{bytes => 16777216,count => 16000,enable => true}.
|
||||
|
|
|
@ -77,14 +77,14 @@ groups() ->
|
|||
].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
emqx_config:put_listener_conf(ssl, default, [ssl, verify], verify_peer),
|
||||
emqx_listeners:restart_listener('ssl:default'),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Test cases for MQTT v3
|
||||
|
@ -324,7 +324,7 @@ tls_certcn_as_clientid(TLSVsn) ->
|
|||
tls_certcn_as_clientid(TLSVsn, RequiredTLSVsn) ->
|
||||
CN = <<"Client">>,
|
||||
emqx_config:put_zone_conf(default, [mqtt, peer_cert_as_clientid], cn),
|
||||
SslConf = emqx_ct_helpers:client_ssl_twoway(TLSVsn),
|
||||
SslConf = emqx_common_test_helpers:client_ssl_twoway(TLSVsn),
|
||||
{ok, Client} = emqtt:start_link([{port, 8883}, {ssl, true}, {ssl_opts, SslConf}]),
|
||||
{ok, _} = emqtt:connect(Client),
|
||||
#{clientinfo := #{clientid := CN}} = emqx_cm:get_chan_info(CN),
|
||||
|
|
|
@ -37,15 +37,15 @@
|
|||
%%--------------------------------------------------------------------
|
||||
suite() -> [{timetrap, {minutes, 2}}].
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% TODO: Add more test cases
|
||||
|
|
|
@ -21,15 +21,15 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_start_link(_) ->
|
||||
emqx_cm_locker:start_link().
|
||||
|
|
|
@ -25,15 +25,15 @@
|
|||
%% CT callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
|
|
@ -0,0 +1,435 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_common_test_helpers).
|
||||
|
||||
-define(THIS_APP, ?MODULE).
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
-type(special_config_handler() :: fun()).
|
||||
|
||||
-type(apps() :: list(atom())).
|
||||
|
||||
-export([ all/1
|
||||
, boot_modules/1
|
||||
, start_apps/1
|
||||
, start_apps/2
|
||||
, start_app/4
|
||||
, stop_apps/1
|
||||
, reload/2
|
||||
, app_path/2
|
||||
, deps_path/2
|
||||
, flush/0
|
||||
, flush/1
|
||||
]).
|
||||
|
||||
-export([ ensure_mnesia_stopped/0
|
||||
, wait_for/4
|
||||
, change_emqx_opts/1
|
||||
, change_emqx_opts/2
|
||||
, client_ssl_twoway/0
|
||||
, client_ssl_twoway/1
|
||||
, client_ssl/0
|
||||
, client_ssl/1
|
||||
, wait_mqtt_payload/1
|
||||
, not_wait_mqtt_payload/1
|
||||
, render_config_file/2
|
||||
, read_schema_configs/2
|
||||
]).
|
||||
|
||||
-define( CERTS_PATH(CertName), filename:join( [ "etc", "certs", CertName ]) ).
|
||||
|
||||
-define( MQTT_SSL_TWOWAY, [ { cacertfile, ?CERTS_PATH( "cacert.pem" ) },
|
||||
{ verify, verify_peer },
|
||||
{ fail_if_no_peer_cert, true } ] ).
|
||||
|
||||
-define( MQTT_SSL_CLIENT_CERTS, [ { keyfile, ?CERTS_PATH( "client-key.pem" ) },
|
||||
{ cacertfile, ?CERTS_PATH( "cacert.pem" ) },
|
||||
{ certfile, ?CERTS_PATH( "client-cert.pem" ) } ] ).
|
||||
|
||||
-define( TLS_1_3_CIPHERS, [ { versions, [ 'tlsv1.3' ] },
|
||||
{ ciphers, [ "TLS_AES_256_GCM_SHA384",
|
||||
"TLS_AES_128_GCM_SHA256",
|
||||
"TLS_CHACHA20_POLY1305_SHA256",
|
||||
"TLS_AES_128_CCM_SHA256",
|
||||
"TLS_AES_128_CCM_8_SHA256"
|
||||
] }
|
||||
]).
|
||||
|
||||
-define( TLS_OLD_CIPHERS, [ { versions, [ 'tlsv1.1', 'tlsv1.2' ] },
|
||||
{ ciphers, [ "ECDHE-ECDSA-AES256-GCM-SHA384",
|
||||
"ECDHE-RSA-AES256-GCM-SHA384",
|
||||
"ECDHE-ECDSA-AES256-SHA384",
|
||||
"ECDHE-RSA-AES256-SHA384",
|
||||
"ECDHE-ECDSA-DES-CBC3-SHA",
|
||||
"ECDH-ECDSA-AES256-GCM-SHA384",
|
||||
"ECDH-RSA-AES256-GCM-SHA384",
|
||||
"ECDH-ECDSA-AES256-SHA384",
|
||||
"ECDH-RSA-AES256-SHA384",
|
||||
"DHE-DSS-AES256-GCM-SHA384",
|
||||
"DHE-DSS-AES256-SHA256",
|
||||
"AES256-GCM-SHA384",
|
||||
"AES256-SHA256",
|
||||
"ECDHE-ECDSA-AES128-GCM-SHA256",
|
||||
"ECDHE-RSA-AES128-GCM-SHA256",
|
||||
"ECDHE-ECDSA-AES128-SHA256",
|
||||
"ECDHE-RSA-AES128-SHA256",
|
||||
"ECDH-ECDSA-AES128-GCM-SHA256",
|
||||
"ECDH-RSA-AES128-GCM-SHA256",
|
||||
"ECDH-ECDSA-AES128-SHA256",
|
||||
"ECDH-RSA-AES128-SHA256",
|
||||
"DHE-DSS-AES128-GCM-SHA256",
|
||||
"DHE-DSS-AES128-SHA256",
|
||||
"AES128-GCM-SHA256",
|
||||
"AES128-SHA256",
|
||||
"ECDHE-ECDSA-AES256-SHA",
|
||||
"ECDHE-RSA-AES256-SHA",
|
||||
"DHE-DSS-AES256-SHA",
|
||||
"ECDH-ECDSA-AES256-SHA",
|
||||
"ECDH-RSA-AES256-SHA",
|
||||
"AES256-SHA",
|
||||
"ECDHE-ECDSA-AES128-SHA",
|
||||
"ECDHE-RSA-AES128-SHA",
|
||||
"DHE-DSS-AES128-SHA",
|
||||
"ECDH-ECDSA-AES128-SHA",
|
||||
"ECDH-RSA-AES128-SHA",
|
||||
"AES128-SHA"
|
||||
] }
|
||||
]).
|
||||
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% APIs
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all(Suite) ->
|
||||
lists:usort([F || {F, 1} <- Suite:module_info(exports),
|
||||
string:substr(atom_to_list(F), 1, 2) == "t_"
|
||||
]).
|
||||
|
||||
-spec(boot_modules(all|list(atom())) -> ok).
|
||||
boot_modules(Mods) ->
|
||||
application:set_env(emqx, boot_modules, Mods).
|
||||
|
||||
-spec(start_apps(Apps :: apps()) -> ok).
|
||||
start_apps(Apps) ->
|
||||
start_apps(Apps, fun(_) -> ok end).
|
||||
|
||||
-spec(start_apps(Apps :: apps(), Handler :: special_config_handler()) -> ok).
|
||||
start_apps(Apps, Handler) when is_function(Handler) ->
|
||||
%% Load all application code to beam vm first
|
||||
%% Because, minirest, ekka etc.. application will scan these modules
|
||||
lists:foreach(fun load/1, [emqx | Apps]),
|
||||
lists:foreach(fun(App) -> start_app(App, Handler) end, [emqx | Apps]).
|
||||
|
||||
load(App) ->
|
||||
case application:load(App) of
|
||||
ok -> ok;
|
||||
{error, {already_loaded, _}} -> ok;
|
||||
{error, Reason} -> error({failed_to_load_app, App, Reason})
|
||||
end.
|
||||
|
||||
start_app(App, Handler) ->
|
||||
start_app(App,
|
||||
app_schema(App),
|
||||
app_path(App, filename:join(["etc", atom_to_list(App) ++ ".conf"])),
|
||||
Handler).
|
||||
|
||||
%% TODO: get rid of cuttlefish
|
||||
app_schema(App) ->
|
||||
CuttlefishSchema = app_path(App, filename:join(["priv", atom_to_list(App) ++ ".schema"])),
|
||||
case filelib:is_regular(CuttlefishSchema) of
|
||||
true ->
|
||||
CuttlefishSchema;
|
||||
false ->
|
||||
Mod = list_to_atom(atom_to_list(App) ++ "_schema"),
|
||||
try
|
||||
true = is_list(Mod:roots()),
|
||||
Mod
|
||||
catch
|
||||
C : E ->
|
||||
error(#{app => App,
|
||||
file => CuttlefishSchema,
|
||||
module => Mod,
|
||||
exeption => C,
|
||||
reason => E
|
||||
})
|
||||
end
|
||||
end.
|
||||
|
||||
mustache_vars(App) ->
|
||||
[{platform_data_dir, app_path(App, "data")},
|
||||
{platform_etc_dir, app_path(App, "etc")},
|
||||
{platform_log_dir, app_path(App, "log")},
|
||||
{platform_plugins_dir, app_path(App, "plugins")}
|
||||
].
|
||||
|
||||
start_app(App, Schema, ConfigFile, SpecAppConfig) ->
|
||||
Vars = mustache_vars(App),
|
||||
RenderedConfigFile = render_config_file(ConfigFile, Vars),
|
||||
read_schema_configs(Schema, RenderedConfigFile),
|
||||
force_set_config_file_paths(App, [RenderedConfigFile]),
|
||||
SpecAppConfig(App),
|
||||
case application:ensure_all_started(App) of
|
||||
{ok, _} -> ok;
|
||||
{error, Reason} -> error({failed_to_start_app, App, Reason})
|
||||
end.
|
||||
|
||||
render_config_file(ConfigFile, Vars0) ->
|
||||
Temp = case file:read_file(ConfigFile) of
|
||||
{ok, T} -> T;
|
||||
{error, Reason} -> error({failed_to_read_config_template, ConfigFile, Reason})
|
||||
end,
|
||||
Vars = [{atom_to_list(N), iolist_to_binary(V)} || {N, V} <- Vars0],
|
||||
Targ = bbmustache:render(Temp, Vars),
|
||||
NewName = ConfigFile ++ ".rendered",
|
||||
ok = file:write_file(NewName, Targ),
|
||||
NewName.
|
||||
|
||||
read_schema_configs(Schema, ConfigFile) ->
|
||||
NewConfig = generate_config(Schema, ConfigFile),
|
||||
lists:foreach(
|
||||
fun({App, Configs}) ->
|
||||
[application:set_env(App, Par, Value) || {Par, Value} <- Configs]
|
||||
end, NewConfig).
|
||||
|
||||
generate_config(SchemaModule, ConfigFile) when is_atom(SchemaModule) ->
|
||||
{ok, Conf0} = hocon:load(ConfigFile, #{format => richmap}),
|
||||
hocon_schema:generate(SchemaModule, Conf0);
|
||||
generate_config(SchemaFile, ConfigFile) ->
|
||||
{ok, Conf1} = hocon:load(ConfigFile, #{format => proplists}),
|
||||
Schema = cuttlefish_schema:files([SchemaFile]),
|
||||
cuttlefish_generator:map(Schema, Conf1).
|
||||
|
||||
-spec(stop_apps(list()) -> ok).
|
||||
stop_apps(Apps) ->
|
||||
[application:stop(App) || App <- Apps ++ [emqx, mnesia]].
|
||||
|
||||
%% backward compatible
|
||||
deps_path(App, RelativePath) -> app_path(App, RelativePath).
|
||||
|
||||
app_path(App, RelativePath) ->
|
||||
ok = ensure_app_loaded(App),
|
||||
Lib = code:lib_dir(App),
|
||||
safe_relative_path(filename:join([Lib, RelativePath])).
|
||||
|
||||
assert_app_loaded(App) ->
|
||||
case code:lib_dir(App) of
|
||||
{error, bad_name} -> error({not_loaded, ?THIS_APP});
|
||||
_ -> ok
|
||||
end.
|
||||
|
||||
ensure_app_loaded(?THIS_APP) ->
|
||||
ok = assert_app_loaded(?THIS_APP);
|
||||
ensure_app_loaded(App) ->
|
||||
case code:lib_dir(App) of
|
||||
{error, bad_name} ->
|
||||
ok = assert_app_loaded(?THIS_APP),
|
||||
Dir0 = code:lib_dir(?THIS_APP),
|
||||
LibRoot = upper_level(Dir0),
|
||||
Dir = filename:join([LibRoot, atom_to_list(App), "ebin"]),
|
||||
case code:add_pathz(Dir) of
|
||||
true -> ok;
|
||||
{error, bad_directory} -> error({bad_directory, Dir})
|
||||
end,
|
||||
case application:load(App) of
|
||||
ok -> ok;
|
||||
{error, Reason} -> error({failed_to_load, App, Reason})
|
||||
end,
|
||||
ok = assert_app_loaded(App);
|
||||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
upper_level(Dir) ->
|
||||
Split = filename:split(Dir),
|
||||
UpperReverse = tl(lists:reverse(Split)),
|
||||
filename:join(lists:reverse(UpperReverse)).
|
||||
|
||||
safe_relative_path(Path) ->
|
||||
case filename:split(Path) of
|
||||
["/" | T] ->
|
||||
T1 = do_safe_relative_path(filename:join(T)),
|
||||
filename:join(["/", T1]);
|
||||
_ ->
|
||||
do_safe_relative_path(Path)
|
||||
end.
|
||||
|
||||
do_safe_relative_path(Path) ->
|
||||
case safe_relative_path_2(Path) of
|
||||
unsafe -> Path;
|
||||
OK -> OK
|
||||
end.
|
||||
|
||||
-if(?OTP_RELEASE < 23).
|
||||
safe_relative_path_2(Path) ->
|
||||
filename:safe_relative_path(Path).
|
||||
-else.
|
||||
safe_relative_path_2(Path) ->
|
||||
{ok, Cwd} = file:get_cwd(),
|
||||
filelib:safe_relative_path(Path, Cwd).
|
||||
-endif.
|
||||
|
||||
-spec(reload(App :: atom(), SpecAppConfig :: special_config_handler()) -> ok).
|
||||
reload(App, SpecAppConfigHandler) ->
|
||||
application:stop(App),
|
||||
start_app(App, SpecAppConfigHandler),
|
||||
application:start(App).
|
||||
|
||||
ensure_mnesia_stopped() ->
|
||||
ekka_mnesia:ensure_stopped(),
|
||||
ekka_mnesia:delete_schema().
|
||||
|
||||
%% Help function to wait for Fun to yield 'true'.
|
||||
wait_for(Fn, Ln, F, Timeout) ->
|
||||
{Pid, Mref} = erlang:spawn_monitor(fun() -> wait_loop(F, catch_call(F)) end),
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, false).
|
||||
|
||||
change_emqx_opts(SslType) ->
|
||||
change_emqx_opts(SslType, []).
|
||||
|
||||
change_emqx_opts(SslType, MoreOpts) ->
|
||||
{ok, Listeners} = application:get_env(emqx, listeners),
|
||||
NewListeners =
|
||||
lists:map(fun(Listener) ->
|
||||
maybe_inject_listener_ssl_options(SslType, MoreOpts, Listener)
|
||||
end, Listeners),
|
||||
application:set_env(emqx, listeners, NewListeners).
|
||||
|
||||
maybe_inject_listener_ssl_options(SslType, MoreOpts, {sll, Port, Opts}) ->
|
||||
%% this clause is kept to be backward compatible
|
||||
%% new config for listener is a map, old is a three-element tuple
|
||||
{ssl, Port, inject_listener_ssl_options(SslType, Opts, MoreOpts)};
|
||||
maybe_inject_listener_ssl_options(SslType, MoreOpts, #{proto := ssl, opts := Opts} = Listener) ->
|
||||
Listener#{opts := inject_listener_ssl_options(SslType, Opts, MoreOpts)};
|
||||
maybe_inject_listener_ssl_options(_SslType, _MoreOpts, Listener) ->
|
||||
Listener.
|
||||
|
||||
inject_listener_ssl_options(SslType, Opts, MoreOpts) ->
|
||||
SslOpts = proplists:get_value(ssl_options, Opts),
|
||||
Keyfile = app_path(emqx, filename:join(["etc", "certs", "key.pem"])),
|
||||
Certfile = app_path(emqx, filename:join(["etc", "certs", "cert.pem"])),
|
||||
TupleList1 = lists:keyreplace(keyfile, 1, SslOpts, {keyfile, Keyfile}),
|
||||
TupleList2 = lists:keyreplace(certfile, 1, TupleList1, {certfile, Certfile}),
|
||||
TupleList3 =
|
||||
case SslType of
|
||||
ssl_twoway ->
|
||||
CAfile = app_path(emqx, proplists:get_value(cacertfile, ?MQTT_SSL_TWOWAY)),
|
||||
MutSslList = lists:keyreplace(cacertfile, 1, ?MQTT_SSL_TWOWAY, {cacertfile, CAfile}),
|
||||
lists:merge(TupleList2, MutSslList);
|
||||
_ ->
|
||||
lists:filter(fun ({cacertfile, _}) -> false;
|
||||
({verify, _}) -> false;
|
||||
({fail_if_no_peer_cert, _}) -> false;
|
||||
(_) -> true
|
||||
end, TupleList2)
|
||||
end,
|
||||
TupleList4 = emqx_misc:merge_opts(TupleList3, proplists:get_value(ssl_options, MoreOpts, [])),
|
||||
NMoreOpts = emqx_misc:merge_opts(MoreOpts, [{ssl_options, TupleList4}]),
|
||||
emqx_misc:merge_opts(Opts, NMoreOpts).
|
||||
|
||||
flush() ->
|
||||
flush([]).
|
||||
|
||||
flush(Msgs) ->
|
||||
receive
|
||||
M -> flush([M|Msgs])
|
||||
after
|
||||
0 -> lists:reverse(Msgs)
|
||||
end.
|
||||
|
||||
client_ssl_twoway() ->
|
||||
client_ssl_twoway(default).
|
||||
|
||||
client_ssl_twoway(TLSVsn) ->
|
||||
client_certs() ++ ciphers(TLSVsn).
|
||||
|
||||
%% Paths prepended to cert filenames
|
||||
client_certs() ->
|
||||
[ { Key, app_path(emqx, FilePath) } || { Key, FilePath } <- ?MQTT_SSL_CLIENT_CERTS ].
|
||||
|
||||
client_ssl() ->
|
||||
client_ssl(default).
|
||||
|
||||
client_ssl(TLSVsn) ->
|
||||
ciphers(TLSVsn) ++ [{reuse_sessions, true}].
|
||||
|
||||
ciphers(default) -> []; %% determined via config file defaults
|
||||
ciphers('tlsv1.3') -> ?TLS_1_3_CIPHERS;
|
||||
ciphers(_OlderTLSVsn) -> ?TLS_OLD_CIPHERS.
|
||||
|
||||
wait_mqtt_payload(Payload) ->
|
||||
receive
|
||||
{publish, #{payload := Payload}} ->
|
||||
ct:pal("OK - received msg: ~p~n", [Payload])
|
||||
after 1000 ->
|
||||
ct:fail({timeout, Payload, {msg_box, flush()}})
|
||||
end.
|
||||
|
||||
not_wait_mqtt_payload(Payload) ->
|
||||
receive
|
||||
{publish, #{payload := Payload}} ->
|
||||
ct:fail({received, Payload})
|
||||
after 1000 ->
|
||||
ct:pal("OK - msg ~p is not received", [Payload])
|
||||
end.
|
||||
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, Kill) ->
|
||||
receive
|
||||
{'DOWN', Mref, process, Pid, normal} ->
|
||||
ok;
|
||||
{'DOWN', Mref, process, Pid, {unexpected, Result}} ->
|
||||
erlang:error({unexpected, Fn, Ln, Result});
|
||||
{'DOWN', Mref, process, Pid, {crashed, {C, E, S}}} ->
|
||||
erlang:raise(C, {Fn, Ln, E}, S)
|
||||
after
|
||||
Timeout ->
|
||||
case Kill of
|
||||
true ->
|
||||
erlang:demonitor(Mref, [flush]),
|
||||
erlang:exit(Pid, kill),
|
||||
erlang:error({Fn, Ln, timeout});
|
||||
false ->
|
||||
Pid ! stop,
|
||||
wait_for_down(Fn, Ln, Timeout, Pid, Mref, true)
|
||||
end
|
||||
end.
|
||||
|
||||
wait_loop(_F, ok) -> exit(normal);
|
||||
wait_loop(F, LastRes) ->
|
||||
receive
|
||||
stop -> erlang:exit(LastRes)
|
||||
after
|
||||
100 ->
|
||||
Res = catch_call(F),
|
||||
wait_loop(F, Res)
|
||||
end.
|
||||
|
||||
catch_call(F) ->
|
||||
try
|
||||
case F() of
|
||||
true -> ok;
|
||||
Other -> {unexpected, Other}
|
||||
end
|
||||
catch
|
||||
C : E : S ->
|
||||
{crashed, {C, E, S}}
|
||||
end.
|
||||
|
||||
force_set_config_file_paths(emqx, Paths) ->
|
||||
application:set_env(emqx, config_files, Paths);
|
||||
force_set_config_file_paths(_, _) ->
|
||||
ok.
|
|
@ -20,15 +20,15 @@
|
|||
-compile(nowarn_export_all).
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_fill_default_values(_) ->
|
||||
Conf = #{
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% CT callbacks
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
ok = emqx_logger:set_log_level(emergency),
|
||||
|
@ -66,13 +66,13 @@ t_run_commands(_) ->
|
|||
|
||||
t_print(_) ->
|
||||
ok = emqx_ctl:print("help"),
|
||||
ok = emqx_ctl:print("~s", [help]),
|
||||
ok = emqx_ctl:print("~s", [<<"~!@#$%^&*()">>]),
|
||||
ok = emqx_ctl:print("~ts", [help]),
|
||||
ok = emqx_ctl:print("~ts", [<<"~!@#$%^&*()">>]),
|
||||
% - check the output of the usage
|
||||
mock_print(),
|
||||
?assertEqual("help", emqx_ctl:print("help")),
|
||||
?assertEqual("help", emqx_ctl:print("~s", [help])),
|
||||
?assertEqual("~!@#$%^&*()", emqx_ctl:print("~s", [<<"~!@#$%^&*()">>])),
|
||||
?assertEqual("help", emqx_ctl:print("~ts", [help])),
|
||||
?assertEqual("~!@#$%^&*()", emqx_ctl:print("~ts", [<<"~!@#$%^&*()">>])),
|
||||
unmock_print().
|
||||
|
||||
t_usage(_) ->
|
||||
|
|
|
@ -21,11 +21,11 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
emqx_config:put_zone_conf(default, [flapping_detect],
|
||||
#{max_count => 3,
|
||||
window_time => 100, % 0.1s
|
||||
|
@ -34,7 +34,7 @@ init_per_suite(Config) ->
|
|||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
ekka_mnesia:delete_schema(), %% Clean emqx_banned table
|
||||
ok.
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_init(_) ->
|
||||
GC1 = emqx_gc:init(#{count => 10, bytes => 0}),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_guid_gen(_) ->
|
||||
Guid1 = emqx_guid:gen(),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
% t_lookup(_) ->
|
||||
% error('TODO').
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_contain(_) ->
|
||||
Inflight = emqx_inflight:insert(k, v, emqx_inflight:new()),
|
||||
|
|
|
@ -69,7 +69,7 @@
|
|||
%m #{<<"foo">> => [{}]} NOT SUPPORT
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_decode_encode(_) ->
|
||||
null = decode(encode(null)),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_check(_) ->
|
||||
Keepalive = emqx_keepalive:init(60),
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
%% Setups
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(_, Cfg) ->
|
||||
_ = esockd_limiter:start_link(),
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
NewConfig = generate_config(),
|
||||
|
|
|
@ -25,7 +25,7 @@
|
|||
-define(a, "a").
|
||||
-define(SUPPORTED_LEVELS, [emergency, alert, critical, error, warning, notice, info, debug]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
Config.
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
suite() ->
|
||||
[{ct_hooks, [cth_surefire]}, {timetrap, {seconds, 30}}].
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_new(_) ->
|
||||
with_metrics_server(
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
{nodelay, true}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_merge_opts(_) ->
|
||||
Opts = emqx_misc:merge_opts(?SOCKOPTS, [raw,
|
||||
|
|
|
@ -28,7 +28,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_mount(_) ->
|
||||
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
|
||||
|
|
|
@ -29,15 +29,15 @@
|
|||
send_pend
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_check_pub(_) ->
|
||||
OldConf = emqx:get_config([zones]),
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_id(_) ->
|
||||
foreach_prop(
|
||||
|
|
|
@ -39,19 +39,19 @@ all() ->
|
|||
].
|
||||
|
||||
groups() ->
|
||||
TCs = emqx_ct:all(?MODULE),
|
||||
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||
[ {tcp, [], TCs}
|
||||
, {quic, [], TCs}
|
||||
].
|
||||
|
||||
init_per_group(tcp, Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
[ {port, 1883}, {conn_fun, connect} | Config];
|
||||
init_per_group(quic, Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
[ {port, 14567}, {conn_fun, quic_connect} | Config];
|
||||
init_per_group(_, Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_group(_Group, _Config) ->
|
||||
|
@ -59,12 +59,12 @@ end_per_group(_Group, _Config) ->
|
|||
|
||||
init_per_suite(Config) ->
|
||||
%% Start Apps
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(TestCase, Config) ->
|
||||
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
|
||||
-define(Q, emqx_mqueue).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_info(_) ->
|
||||
Q = ?Q:init(#{max_len => 5, store_qos0 => true}),
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%
|
||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||
%% you may not use this file except in compliance with the License.
|
||||
%% You may obtain a copy of the License at
|
||||
%%
|
||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||
%%
|
||||
%% Unless required by applicable law or agreed to in writing, software
|
||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
%% See the License for the specific language governing permissions and
|
||||
%% limitations under the License.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_olp_SUITE).
|
||||
|
||||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("lc/include/lc.hrl").
|
||||
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_olp:enable(),
|
||||
case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of
|
||||
true -> ok;
|
||||
false ->
|
||||
ct:fail("runq_flagman is not up")
|
||||
end,
|
||||
ok = load_ctl:put_config(#{ ?RUNQ_MON_F0 => true
|
||||
, ?RUNQ_MON_F1 => 5
|
||||
, ?RUNQ_MON_F2 => 1
|
||||
, ?RUNQ_MON_T1 => 200
|
||||
, ?RUNQ_MON_T2 => 50
|
||||
, ?RUNQ_MON_C1 => 2
|
||||
, ?RUNQ_MON_F5 => -1
|
||||
}),
|
||||
Config.
|
||||
|
||||
%% Test that olp could be enabled/disabled globally
|
||||
t_disable_enable(_Config) ->
|
||||
Old = load_ctl:whereis_runq_flagman(),
|
||||
ok = emqx_olp:disable(),
|
||||
?assert(not is_process_alive(Old)),
|
||||
{ok, Pid} = emqx_olp:enable(),
|
||||
timer:sleep(1000),
|
||||
?assert(is_process_alive(Pid)).
|
||||
|
||||
%% Test that overload detection works
|
||||
t_is_overloaded(_Config) ->
|
||||
P = burst_runq(),
|
||||
timer:sleep(3000),
|
||||
?assert(emqx_olp:is_overloaded()),
|
||||
exit(P, kill),
|
||||
timer:sleep(3000),
|
||||
?assert(not emqx_olp:is_overloaded()).
|
||||
|
||||
%% Test that new conn is rejected when olp is enabled
|
||||
t_overloaded_conn(_Config) ->
|
||||
process_flag(trap_exit, true),
|
||||
?assert(erlang:is_process_alive(load_ctl:whereis_runq_flagman())),
|
||||
emqx_config:put([overload_protection, enable], true),
|
||||
P = burst_runq(),
|
||||
timer:sleep(1000),
|
||||
?assert(emqx_olp:is_overloaded()),
|
||||
true = emqx:is_running(node()),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
|
||||
?assertNotMatch({ok, _Pid}, emqtt:connect(C)),
|
||||
exit(P, kill).
|
||||
|
||||
%% Test that new conn is rejected when olp is enabled
|
||||
t_overload_cooldown_conn(Config) ->
|
||||
t_overloaded_conn(Config),
|
||||
timer:sleep(1000),
|
||||
?assert(not emqx_olp:is_overloaded()),
|
||||
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
|
||||
?assertMatch({ok, _Pid}, emqtt:connect(C)),
|
||||
emqtt:stop(C).
|
||||
|
||||
-spec burst_runq() -> ParentToKill :: pid().
|
||||
burst_runq() ->
|
||||
NProc = erlang:system_info(schedulers_online),
|
||||
spawn(?MODULE, worker_parent, [NProc * 10, {?MODULE, busy_loop, []}]).
|
||||
|
||||
%% internal helpers
|
||||
worker_parent(N, {M, F, A}) ->
|
||||
lists:foreach(fun(_) ->
|
||||
proc_lib:spawn_link(fun() -> apply(M, F, A) end)
|
||||
end, lists:seq(1, N)),
|
||||
receive stop -> ok end.
|
||||
|
||||
busy_loop() ->
|
||||
erlang:yield(),
|
||||
busy_loop().
|
||||
|
||||
wait_for(_Fun, 0) ->
|
||||
false;
|
||||
wait_for(Fun, Retry) ->
|
||||
case is_pid(Fun()) of
|
||||
true ->
|
||||
true;
|
||||
false ->
|
||||
timer:sleep(10),
|
||||
wait_for(Fun, Retry - 1)
|
||||
end.
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_config:put([sysmon, os], #{
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
{?AUTH, 'AUTH', ?AUTH_PACKET()}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_type(_) ->
|
||||
lists:foreach(fun({Type, _Name, Packet}) ->
|
||||
|
@ -290,24 +290,24 @@ t_will_msg(_) ->
|
|||
?assertEqual(<<"topic">>, Msg2#message.topic).
|
||||
|
||||
t_format(_) ->
|
||||
io:format("~s", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0}, variable = undefined})]),
|
||||
io:format("~s", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>})]),
|
||||
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{will_flag = true,
|
||||
io:format("~ts", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK, retain = true, dup = 0}, variable = undefined})]),
|
||||
io:format("~ts", [emqx_packet:format(#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = 1, payload = <<"payload">>})]),
|
||||
io:format("~ts", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{will_flag = true,
|
||||
will_retain = true,
|
||||
will_qos = ?QOS_2,
|
||||
will_topic = <<"topic">>,
|
||||
will_payload = <<"payload">>}))]),
|
||||
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))]),
|
||||
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
|
||||
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
|
||||
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]),
|
||||
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
|
||||
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
|
||||
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
|
||||
io:format("~s", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
|
||||
io:format("~ts", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{password = password}))]),
|
||||
io:format("~ts", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
|
||||
io:format("~ts", [emqx_packet:format(?PUBREL_PACKET(99))]),
|
||||
io:format("~ts", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]),
|
||||
io:format("~ts", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
|
||||
io:format("~ts", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
|
||||
io:format("~ts", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
|
||||
|
||||
t_parse_empty_publish(_) ->
|
||||
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_update_counter(_) ->
|
||||
?assertEqual(undefined, emqx_pd:inc_counter(bytes, 1)),
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
|
||||
|
@ -31,23 +31,23 @@ init_per_suite(Config) ->
|
|||
DataPath = proplists:get_value(data_dir, Config),
|
||||
AppPath = filename:join([DataPath, "emqx_mini_plugin"]),
|
||||
HoconPath = filename:join([DataPath, "emqx_hocon_plugin"]),
|
||||
Cmd = lists:flatten(io_lib:format("cd ~s && make", [AppPath])),
|
||||
CmdPath = lists:flatten(io_lib:format("cd ~s && make", [HoconPath])),
|
||||
Cmd = lists:flatten(io_lib:format("cd ~ts && make", [AppPath])),
|
||||
CmdPath = lists:flatten(io_lib:format("cd ~ts && make", [HoconPath])),
|
||||
|
||||
ct:pal("Executing ~s~n", [Cmd]),
|
||||
ct:pal("~n ~s~n", [os:cmd(Cmd)]),
|
||||
ct:pal("Executing ~ts~n", [Cmd]),
|
||||
ct:pal("~n ~ts~n", [os:cmd(Cmd)]),
|
||||
|
||||
ct:pal("Executing ~s~n", [CmdPath]),
|
||||
ct:pal("~n ~s~n", [os:cmd(CmdPath)]),
|
||||
ct:pal("Executing ~ts~n", [CmdPath]),
|
||||
ct:pal("~n ~ts~n", [os:cmd(CmdPath)]),
|
||||
|
||||
emqx_ct_helpers:boot_modules([]),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
emqx_config:put([plugins, expand_plugins_dir], DataPath),
|
||||
?assertEqual(ok, emqx_plugins:load()),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_load(_) ->
|
||||
?assertEqual(ok, emqx_plugins:load()),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
% t_new(_) ->
|
||||
% error('TODO').
|
||||
|
|
|
@ -24,7 +24,7 @@
|
|||
-define(PQ, emqx_pqueue).
|
||||
-define(SUITE, ?MODULE).
|
||||
|
||||
all() -> emqx_ct:all(?SUITE).
|
||||
all() -> emqx_common_test_helpers:all(?SUITE).
|
||||
|
||||
t_is_queue(_) ->
|
||||
Q = ?PQ:new(),
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_frame_error(_) ->
|
||||
?assertEqual(?RC_PACKET_TOO_LARGE, emqx_reason_codes:frame_error(frame_too_large)),
|
||||
|
|
|
@ -65,7 +65,7 @@ handle_msg(ReqMsg, RequestHandler, Parent) ->
|
|||
props = RspProps,
|
||||
payload = RspPayload
|
||||
},
|
||||
emqx_logger:debug("~p sending response msg to topic ~s with~n"
|
||||
emqx_logger:debug("~p sending response msg to topic ~ts with~n"
|
||||
"corr-data=~p~npayload=~p",
|
||||
[?MODULE, RspTopic, CorrData, RspPayload]),
|
||||
ok = send_response(RspMsg);
|
||||
|
|
|
@ -22,12 +22,12 @@
|
|||
-include_lib("common_test/include/ct.hrl").
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
all() ->
|
||||
[request_response].
|
||||
|
|
|
@ -24,15 +24,15 @@
|
|||
|
||||
-define(R, emqx_router).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules([router]),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules([router]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
init_per_testcase(_TestCase, Config) ->
|
||||
clear_tables(),
|
||||
|
|
|
@ -23,14 +23,14 @@
|
|||
|
||||
-define(ROUTER_HELPER, emqx_router_helper).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_monitor(_) ->
|
||||
ok = emqx_router_helper:monitor({undefined, node()}),
|
||||
|
|
|
@ -27,7 +27,7 @@
|
|||
, reclaim/2
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
% t_currval(_) ->
|
||||
% error('TODO').
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
-include_lib("emqx/include/emqx_mqtt.hrl").
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% CT callbacks
|
||||
|
|
|
@ -26,21 +26,21 @@
|
|||
-define(SUITE, ?MODULE).
|
||||
|
||||
-define(wait(For, Timeout),
|
||||
emqx_ct_helpers:wait_for(
|
||||
emqx_common_test_helpers:wait_for(
|
||||
?FUNCTION_NAME, ?LINE, fun() -> For end, Timeout)).
|
||||
|
||||
-define(ack, shared_sub_ack).
|
||||
-define(no_ack, no_ack).
|
||||
|
||||
all() -> emqx_ct:all(?SUITE).
|
||||
all() -> emqx_common_test_helpers:all(?SUITE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_is_ack_required(_) ->
|
||||
?assertEqual(false, emqx_shared_sub:is_ack_required(#message{headers = #{}})).
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_cast_useless_msg(_)->
|
||||
emqx_stats:setstat('notExis', 1),
|
||||
|
|
|
@ -21,15 +21,15 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_child(_) ->
|
||||
?assertMatch({error, _}, emqx_sup:start_child(undef, worker)),
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
application:load(emqx),
|
||||
|
|
|
@ -42,11 +42,11 @@
|
|||
fmt("long_schedule warning: port = ~p", [?FAKE_PORT]), ?FAKE_INFO}
|
||||
]).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(t_sys_mon, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([],
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([],
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, sysmon, [{busy_dist_port,true},
|
||||
{busy_port,false},
|
||||
|
@ -58,8 +58,8 @@ init_per_testcase(t_sys_mon, Config) ->
|
|||
end),
|
||||
Config;
|
||||
init_per_testcase(t_sys_mon2, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([],
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([],
|
||||
fun(emqx) ->
|
||||
application:set_env(emqx, sysmon, [{busy_dist_port,false},
|
||||
{busy_port,true},
|
||||
|
@ -72,12 +72,12 @@ init_per_testcase(t_sys_mon2, Config) ->
|
|||
end),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_procinfo(_) ->
|
||||
ok = meck:new(emqx_vm, [passthrough, no_history]),
|
||||
|
|
|
@ -23,7 +23,7 @@
|
|||
|
||||
-define(TAB, ?MODULE).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_new(_) ->
|
||||
ok = emqx_tables:new(?TAB),
|
||||
|
|
|
@ -29,14 +29,14 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Inital funcs
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
ok.
|
||||
%%--------------------------------------------------------------------
|
||||
%% Testcases
|
||||
|
|
|
@ -36,7 +36,7 @@
|
|||
|
||||
-define(N, 100000).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_wildcard(_) ->
|
||||
true = wildcard(<<"a/b/#">>),
|
||||
|
@ -214,5 +214,5 @@ bench(Case, Fun, Args) ->
|
|||
[fun(_) -> apply(Fun, Args) end,
|
||||
lists:seq(1, ?N)
|
||||
]),
|
||||
ct:pal("Time consumed by ~s: ~.3f(us)~nCall ~s per second: ~w",
|
||||
ct:pal("Time consumed by ~ts: ~.3f(us)~nCall ~ts per second: ~w",
|
||||
[Case, Time/?N, Case, (?N * 1000000) div Time]).
|
||||
|
|
|
@ -26,12 +26,12 @@
|
|||
all() -> [t_trace_clientid, t_trace_topic].
|
||||
|
||||
init_per_suite(Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_suite(_Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_trace_clientid(_Config) ->
|
||||
{ok, T} = emqtt:start_link([{host, "localhost"},
|
||||
|
|
|
@ -30,7 +30,7 @@ all() ->
|
|||
].
|
||||
|
||||
groups() ->
|
||||
Cases = emqx_ct:all(?MODULE),
|
||||
Cases = emqx_common_test_helpers:all(?MODULE),
|
||||
[{compact, Cases}, {not_compact, Cases}].
|
||||
|
||||
init_per_group(compact, Config) ->
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
t_load(_Config) ->
|
||||
?assertMatch([{load1, _}, {load5, _}, {load15, _}], emqx_vm:loads()).
|
||||
|
@ -73,9 +73,6 @@ t_get_memory(_Config) ->
|
|||
t_schedulers(_Config) ->
|
||||
emqx_vm:schedulers().
|
||||
|
||||
t_get_process_group_leader_info(_Config) ->
|
||||
emqx_vm:get_process_group_leader_info(self()).
|
||||
|
||||
t_get_process_limit(_Config) ->
|
||||
emqx_vm:get_process_limit().
|
||||
|
||||
|
|
|
@ -21,11 +21,11 @@
|
|||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_testcase(t_alarms, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
emqx_config:put([sysmon, vm], #{
|
||||
process_high_watermark => 0,
|
||||
process_low_watermark => 0,
|
||||
|
@ -35,12 +35,12 @@ init_per_testcase(t_alarms, Config) ->
|
|||
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_vm_mon),
|
||||
Config;
|
||||
init_per_testcase(_, Config) ->
|
||||
emqx_ct_helpers:boot_modules(all),
|
||||
emqx_ct_helpers:start_apps([]),
|
||||
emqx_common_test_helpers:boot_modules(all),
|
||||
emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(_, _Config) ->
|
||||
emqx_ct_helpers:stop_apps([]).
|
||||
emqx_common_test_helpers:stop_apps([]).
|
||||
|
||||
t_alarms(_) ->
|
||||
timer:sleep(500),
|
||||
|
|
|
@ -35,7 +35,7 @@
|
|||
|
||||
-define(ws_conn, emqx_ws_connection).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% CT callbacks
|
||||
|
@ -79,7 +79,7 @@ init_per_testcase(TestCase, Config) when
|
|||
Config;
|
||||
|
||||
init_per_testcase(_, Config) ->
|
||||
ok = emqx_ct_helpers:start_apps([]),
|
||||
ok = emqx_common_test_helpers:start_apps([]),
|
||||
Config.
|
||||
|
||||
end_per_testcase(TestCase, _Config) when
|
||||
|
@ -98,7 +98,7 @@ end_per_testcase(TestCase, _Config) when
|
|||
]);
|
||||
|
||||
end_per_testcase(_, Config) ->
|
||||
emqx_ct_helpers:stop_apps([]),
|
||||
emqx_common_test_helpers:stop_apps([]),
|
||||
Config.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -1991,13 +1991,13 @@ convert_certs(Config) ->
|
|||
serialize_error({not_found, {authenticator, ID}}) ->
|
||||
{404, #{code => <<"NOT_FOUND">>,
|
||||
message => list_to_binary(
|
||||
io_lib:format("Authenticator '~s' does not exist", [ID])
|
||||
io_lib:format("Authenticator '~ts' does not exist", [ID])
|
||||
)}};
|
||||
|
||||
serialize_error({not_found, {listener, ID}}) ->
|
||||
{404, #{code => <<"NOT_FOUND">>,
|
||||
message => list_to_binary(
|
||||
io_lib:format("Listener '~s' does not exist", [ID])
|
||||
io_lib:format("Listener '~ts' does not exist", [ID])
|
||||
)}};
|
||||
|
||||
serialize_error({not_found, {chain, ?GLOBAL}}) ->
|
||||
|
@ -2007,13 +2007,13 @@ serialize_error({not_found, {chain, ?GLOBAL}}) ->
|
|||
serialize_error({not_found, {chain, Name}}) ->
|
||||
{400, #{code => <<"BAD_REQUEST">>,
|
||||
message => list_to_binary(
|
||||
io_lib:format("No authentication has been create for listener '~s'", [Name])
|
||||
io_lib:format("No authentication has been create for listener '~ts'", [Name])
|
||||
)}};
|
||||
|
||||
serialize_error({already_exists, {authenticator, ID}}) ->
|
||||
{409, #{code => <<"ALREADY_EXISTS">>,
|
||||
message => list_to_binary(
|
||||
io_lib:format("Authenticator '~s' already exist", [ID])
|
||||
io_lib:format("Authenticator '~ts' already exist", [ID])
|
||||
)}};
|
||||
|
||||
serialize_error(no_available_provider) ->
|
||||
|
|
|
@ -77,7 +77,7 @@ mnesia(copy) ->
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:scram:builtin-db".
|
||||
namespace() -> "authn-scram-builtin_db".
|
||||
|
||||
roots() -> [config].
|
||||
|
||||
|
@ -148,7 +148,7 @@ add_user(#{user_id := UserID,
|
|||
case mnesia:read(?TAB, {UserGroup, UserID}, write) of
|
||||
[] ->
|
||||
IsSuperuser = maps:get(is_superuser, UserInfo, false),
|
||||
add_user(UserID, Password, IsSuperuser, State),
|
||||
add_user(UserGroup, UserID, Password, IsSuperuser, State),
|
||||
{ok, #{user_id => UserID, is_superuser => IsSuperuser}};
|
||||
[_] ->
|
||||
{error, already_exist}
|
||||
|
@ -240,9 +240,9 @@ check_client_final_message(Bin, #{is_superuser := IsSuperuser} = Cache, #{algori
|
|||
{error, not_authorized}
|
||||
end.
|
||||
|
||||
add_user(UserID, Password, IsSuperuser, State) ->
|
||||
add_user(UserGroup, UserID, Password, IsSuperuser, State) ->
|
||||
{StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info(Password, State),
|
||||
UserInfo = #user_info{user_id = UserID,
|
||||
UserInfo = #user_info{user_id = {UserGroup, UserID},
|
||||
stored_key = StoredKey,
|
||||
server_key = ServerKey,
|
||||
salt = Salt,
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:http-server".
|
||||
namespace() -> "authn-password_based-http_server".
|
||||
|
||||
roots() ->
|
||||
[ {config, {union, [ hoconsc:ref(?MODULE, get)
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:jwt".
|
||||
namespace() -> "authn-jwt".
|
||||
|
||||
roots() ->
|
||||
[ {config, {union, [ hoconsc:mk('hmac-based')
|
||||
|
|
|
@ -84,7 +84,7 @@ mnesia(copy) ->
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:builtin-db".
|
||||
namespace() -> "authn-password_based-builtin_db".
|
||||
|
||||
roots() -> [config].
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:mongodb".
|
||||
namespace() -> "authn-password_based-mongodb".
|
||||
|
||||
roots() ->
|
||||
[ {config, {union, [ hoconsc:mk(standalone)
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:mysql".
|
||||
namespace() -> "authn-password_based-mysql".
|
||||
|
||||
roots() -> [config].
|
||||
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:postgresql".
|
||||
namespace() -> "authn-password_based-postgresql".
|
||||
|
||||
roots() -> [config].
|
||||
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
%% Hocon Schema
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
namespace() -> "authn:password-based:redis".
|
||||
namespace() -> "authn-password_based-redis".
|
||||
|
||||
roots() ->
|
||||
[ {config, {union, [ hoconsc:mk(standalone)
|
||||
|
|
|
@ -19,4 +19,4 @@
|
|||
-compile(export_all).
|
||||
-compile(nowarn_export_all).
|
||||
|
||||
all() -> emqx_ct:all(?MODULE).
|
||||
all() -> emqx_common_test_helpers:all(?MODULE).
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue