refactor: rename emqx_misc to emqx_utils

This commit is contained in:
Stefan Strigler 2023-04-13 14:45:18 +02:00
parent 1880da0a2e
commit 9c11bfce80
101 changed files with 401 additions and 447 deletions

View File

@ -51,7 +51,7 @@ with_node_or_cluster(Node, Fun) ->
-spec lookup_node(atom() | binary()) -> {ok, atom()} | not_found.
lookup_node(BinNode) when is_binary(BinNode) ->
case emqx_misc:safe_to_existing_atom(BinNode, utf8) of
case emqx_utils:safe_to_existing_atom(BinNode, utf8) of
{ok, Node} ->
is_running_node(Node);
_Error ->

View File

@ -277,9 +277,9 @@ atom(Bin) -> binary_to_existing_atom(Bin, utf8).
certs_dir(ChainName, ConfigOrID) ->
DirName = dir(ChainName, ConfigOrID),
SubDir = iolist_to_binary(filename:join(["authn", DirName])),
emqx_misc:safe_filename(SubDir).
emqx_utils:safe_filename(SubDir).
dir(ChainName, ID) when is_binary(ID) ->
emqx_misc:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID]));
emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID]));
dir(ChainName, Config) when is_map(Config) ->
dir(ChainName, authenticator_id(Config)).

View File

@ -243,7 +243,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #{expiry_timer := TRef}) ->
emqx_misc:cancel_timer(TRef).
emqx_utils:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -254,10 +254,10 @@ code_change(_OldVsn, State, _Extra) ->
-ifdef(TEST).
ensure_expiry_timer(State) ->
State#{expiry_timer := emqx_misc:start_timer(10, expire)}.
State#{expiry_timer := emqx_utils:start_timer(10, expire)}.
-else.
ensure_expiry_timer(State) ->
State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
State#{expiry_timer := emqx_utils:start_timer(timer:minutes(1), expire)}.
-endif.
expire_banned_items(Now) ->

View File

@ -85,7 +85,7 @@ commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) ->
reset(Batch).
reset(Batch = #batch{linger_timer = TRef}) ->
_ = emqx_misc:cancel_timer(TRef),
_ = emqx_utils:cancel_timer(TRef),
Batch#batch{batch_q = [], linger_timer = undefined}.
-spec size(batch()) -> non_neg_integer().

View File

@ -92,7 +92,7 @@
start_link(Pool, Id) ->
ok = create_tabs(),
gen_server:start_link(
{local, emqx_misc:proc_name(?BROKER, Id)},
{local, emqx_utils:proc_name(?BROKER, Id)},
?MODULE,
[Pool, Id],
[]

View File

@ -131,7 +131,7 @@ handle_cast(Msg, State) ->
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
SubPids = [SubPid | emqx_misc:drain_down(?BATCH_SIZE)],
SubPids = [SubPid | emqx_utils:drain_down(?BATCH_SIZE)],
ok = emqx_pool:async_submit(
fun lists:foreach/2, [fun clean_down/1, SubPids]
),

View File

@ -61,7 +61,7 @@
-export([set_field/3]).
-import(
emqx_misc,
emqx_utils,
[
run_fold/3,
pipeline/3,
@ -622,7 +622,7 @@ process_connect(
NChannel = Channel#channel{session = Session},
handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel));
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())),
NChannel = Channel#channel{
session = Session,
resuming = true,
@ -1203,7 +1203,7 @@ handle_call(
) ->
ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(),
Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
disconnect_and_shutdown(takenover, AllPendings, Channel);
handle_call(list_authz_cache, Channel) ->
@ -1402,7 +1402,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
@ -2045,7 +2045,7 @@ clear_keepalive(Channel = #channel{timers = Timers}) ->
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
emqx_utils:cancel_timer(TRef),
Channel#channel{timers = maps:without([alive_timer], Timers)}
end.
%%--------------------------------------------------------------------
@ -2241,7 +2241,7 @@ get_mqtt_conf(Zone, Key, Default) ->
%%--------------------------------------------------------------------
set_field(Name, Value, Channel) ->
Pos = emqx_misc:index_of(Name, record_info(fields, channel)),
Pos = emqx_utils:index_of(Name, record_info(fields, channel)),
setelement(Pos + 1, Channel, Value).
get_mqueue(#channel{session = Session}) ->

View File

@ -672,7 +672,7 @@ handle_cast(Msg, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) ->
?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}),
ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
ChanPids = [Pid | emqx_utils:drain_down(?BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
lists:foreach(fun mark_channel_disconnected/1, ChanPids),
ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]),

View File

@ -77,7 +77,7 @@
-export([set_field/3]).
-import(
emqx_misc,
emqx_utils,
[start_timer/2]
).
@ -260,7 +260,7 @@ stats(#state{
{error, _} -> []
end,
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
ProcStats = emqx_utils:proc_stats(),
lists:append([SockStats, ChanStats, ProcStats]).
%% @doc Set TCP keepalive socket options to override system defaults.
@ -392,7 +392,7 @@ run_loop(
emqx_channel:info(zone, Channel),
[force_shutdown]
),
emqx_misc:tune_heap_size(ShutdownPolicy),
emqx_utils:tune_heap_size(ShutdownPolicy),
case activate_socket(State) of
{ok, NState} ->
hibernate(Parent, NState);
@ -472,7 +472,7 @@ ensure_stats_timer(_Timeout, State) ->
-compile({inline, [cancel_stats_timer/1]}).
cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
?tp(debug, cancel_stats_timer, #{}),
ok = emqx_misc:cancel_timer(TRef),
ok = emqx_utils:cancel_timer(TRef),
State#state{stats_timer = undefined};
cancel_stats_timer(State) ->
State.
@ -558,7 +558,7 @@ handle_msg(
{incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
State = #state{idle_timer = IdleTimer}
) ->
ok = emqx_misc:cancel_timer(IdleTimer),
ok = emqx_utils:cancel_timer(IdleTimer),
Serialize = emqx_frame:serialize_opts(ConnPkt),
NState = State#state{
serialize = Serialize,
@ -593,7 +593,7 @@ handle_msg(
#state{listener = {Type, Listener}} = State
) ->
ActiveN = get_active_n(Type, Listener),
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) ->
@ -1073,7 +1073,7 @@ check_oom(State = #state{channel = Channel}) ->
emqx_channel:info(zone, Channel), [force_shutdown]
),
?tp(debug, check_oom, #{policy => ShutdownPolicy}),
case emqx_misc:check_oom(ShutdownPolicy) of
case emqx_utils:check_oom(ShutdownPolicy) of
{shutdown, Reason} ->
%% triggers terminate/2 callback immediately
erlang:exit({shutdown, Reason});
@ -1200,7 +1200,7 @@ inc_counter(Key, Inc) ->
%%--------------------------------------------------------------------
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
Pos = emqx_utils:index_of(Name, record_info(fields, state)),
setelement(Pos + 1, State, Value).
get_state(Pid) ->

View File

@ -117,7 +117,7 @@ handle_call(Call, _From, State) ->
handle_cast({evict, URL}, State0 = #state{refresh_timers = RefreshTimers0}) ->
emqx_ssl_crl_cache:delete(URL),
MTimer = maps:get(URL, RefreshTimers0, undefined),
emqx_misc:cancel_timer(MTimer),
emqx_utils:cancel_timer(MTimer),
RefreshTimers = maps:without([URL], RefreshTimers0),
State = State0#state{refresh_timers = RefreshTimers},
?tp(
@ -223,9 +223,9 @@ ensure_timer(URL, State = #state{refresh_interval = Timeout}) ->
ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) ->
?tp(crl_cache_ensure_timer, #{url => URL, timeout => Timeout}),
MTimer = maps:get(URL, RefreshTimers0, undefined),
emqx_misc:cancel_timer(MTimer),
emqx_utils:cancel_timer(MTimer),
RefreshTimers = RefreshTimers0#{
URL => emqx_misc:start_timer(
URL => emqx_utils:start_timer(
Timeout,
{refresh, URL}
)
@ -297,7 +297,7 @@ handle_cache_overflow(State0) ->
{_Time, OldestURL, InsertionTimes} = gb_trees:take_smallest(InsertionTimes0),
emqx_ssl_crl_cache:delete(OldestURL),
MTimer = maps:get(OldestURL, RefreshTimers0, undefined),
emqx_misc:cancel_timer(MTimer),
emqx_utils:cancel_timer(MTimer),
RefreshTimers = maps:remove(OldestURL, RefreshTimers0),
CachedURLs = sets:del_element(OldestURL, CachedURLs0),
?tp(debug, crl_cache_overflow, #{oldest_url => OldestURL}),

View File

@ -184,7 +184,7 @@ code_change(_OldVsn, State, _Extra) ->
start_timer(Zone) ->
WindTime = maps:get(window_time, get_policy(Zone)),
emqx_misc:start_timer(WindTime, {garbage_collect, Zone}).
emqx_utils:start_timer(WindTime, {garbage_collect, Zone}).
start_timers() ->
lists:foreach(

View File

@ -145,10 +145,10 @@ npid() ->
NPid.
to_hexstr(I) when byte_size(I) =:= 16 ->
emqx_misc:bin_to_hexstr(I, upper).
emqx_utils:bin_to_hexstr(I, upper).
from_hexstr(S) when byte_size(S) =:= 32 ->
emqx_misc:hexstr_to_bin(S).
emqx_utils:hexstr_to_bin(S).
to_base62(<<I:128>>) ->
emqx_base62:encode(I).

View File

@ -375,7 +375,7 @@ return_pause(infinity, PauseType, Fun, Diff, Limiter) ->
{PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter};
return_pause(Rate, PauseType, Fun, Diff, Limiter) ->
Val = erlang:round(Diff * emqx_limiter_schema:default_period() / Rate),
Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
Pause = emqx_utils:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE),
{PauseType, Pause, make_retry_context(Fun, Diff), Limiter}.
-spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) ->

View File

@ -538,7 +538,7 @@ esockd_access_rules(StrRules) ->
[A, CIDR] = string:tokens(S, " "),
%% esockd rules only use words 'allow' and 'deny', both are existing
%% comparison of strings may be better, but there is a loss of backward compatibility
case emqx_misc:safe_to_existing_atom(A) of
case emqx_utils:safe_to_existing_atom(A) of
{ok, Action} ->
[
{
@ -560,7 +560,7 @@ esockd_access_rules(StrRules) ->
merge_default(Options) ->
case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1];
[{tcp_options, emqx_utils:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1];
false ->
[{tcp_options, ?MQTT_SOCKOPTS} | Options]
end.

View File

@ -476,9 +476,9 @@ ensure_timer(ListenerID, State, Timeout) ->
ensure_timer(ListenerID, {refresh, ListenerID}, State, Timeout).
ensure_timer(ListenerID, Message, State, Timeout) ->
emqx_misc:cancel_timer(maps:get(?REFRESH_TIMER(ListenerID), State, undefined)),
emqx_utils:cancel_timer(maps:get(?REFRESH_TIMER(ListenerID), State, undefined)),
State#{
?REFRESH_TIMER(ListenerID) => emqx_misc:start_timer(
?REFRESH_TIMER(ListenerID) => emqx_utils:start_timer(
Timeout,
Message
)

View File

@ -180,8 +180,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
cancel_outdated_timer(#{mem_time_ref := MemRef, cpu_time_ref := CpuRef}) ->
emqx_misc:cancel_timer(MemRef),
emqx_misc:cancel_timer(CpuRef),
emqx_utils:cancel_timer(MemRef),
emqx_utils:cancel_timer(CpuRef),
ok.
start_cpu_check_timer() ->
@ -204,7 +204,7 @@ start_mem_check_timer() ->
end.
start_timer(Interval, Msg) ->
emqx_misc:start_timer(Interval, Msg).
emqx_utils:start_timer(Interval, Msg).
update_mem_alarm_status(HWM) when HWM > 1.0 orelse HWM < 0.0 ->
?SLOG(warning, #{msg => "discarded_out_of_range_mem_alarm_threshold", value => HWM}),

View File

@ -57,7 +57,7 @@
-spec start_link(atom(), pos_integer()) -> startlink_ret().
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]

View File

@ -98,7 +98,7 @@ mnesia(boot) ->
-spec start_link(atom(), pos_integer()) -> startlink_ret().
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]

View File

@ -2327,7 +2327,7 @@ mqtt_ssl_listener_ssl_options_validator(Conf) ->
fun ocsp_outer_validator/1,
fun crl_outer_validator/1
],
case emqx_misc:pipeline(Checks, Conf, not_used) of
case emqx_utils:pipeline(Checks, Conf, not_used) of
{ok, _, _} ->
ok;
{error, Reason, _NotUsed} ->

View File

@ -941,7 +941,7 @@ age(Now, Ts) -> Now - Ts.
%%--------------------------------------------------------------------
set_field(Name, Value, Session) ->
Pos = emqx_misc:index_of(Name, record_info(fields, session)),
Pos = emqx_utils:index_of(Name, record_info(fields, session)),
setelement(Pos + 1, Session, Value).
get_mqueue(#session{mqueue = Q}) ->

View File

@ -104,7 +104,7 @@ create_init_tab() ->
-spec start_link(atom(), pos_integer()) -> startlink_ret().
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]

View File

@ -213,7 +213,7 @@ init(#{tick_ms := TickMs}) ->
{ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}.
start_timer(#state{tick_ms = Ms} = State) ->
State#state{timer = emqx_misc:start_timer(Ms, tick)}.
State#state{timer = emqx_utils:start_timer(Ms, tick)}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
@ -301,7 +301,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->
emqx_misc:cancel_timer(TRef).
emqx_utils:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -62,7 +62,7 @@
-endif.
-import(emqx_topic, [systop/1]).
-import(emqx_misc, [start_timer/2]).
-import(emqx_utils, [start_timer/2]).
-record(state, {
heartbeat :: maybe(reference()),
@ -222,7 +222,7 @@ handle_info(Info, State) ->
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->
_ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
unload_event_hooks(sys_event_messages()),
lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]).
lists:foreach(fun emqx_utils:cancel_timer/1, [TRef1, TRef2]).
unload_event_hooks([]) ->
ok;

View File

@ -77,7 +77,7 @@ init([]) ->
{ok, start_timer(#{timer => undefined, events => []})}.
start_timer(State) ->
State#{timer := emqx_misc:start_timer(timer:seconds(2), reset)}.
State#{timer := emqx_utils:start_timer(timer:seconds(2), reset)}.
sysm_opts(VM) ->
sysm_opts(maps:to_list(VM), []).
@ -204,7 +204,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef),
emqx_utils:cancel_timer(TRef),
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -272,7 +272,7 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) ->
?tp(update_trace_done, #{}),
{noreply, State#{timer => NextTRef}};
handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef),
emqx_utils:cancel_timer(TRef),
handle_info({timeout, TRef, update_trace}, State);
handle_info(Info, State) ->
?SLOG(error, #{unexpected_info => Info}),
@ -280,7 +280,7 @@ handle_info(Info, State) ->
terminate(_Reason, #{timer := TRef}) ->
_ = mnesia:unsubscribe({table, ?TRACE, simple}),
emqx_misc:cancel_timer(TRef),
emqx_utils:cancel_timer(TRef),
stop_all_trace_handler(),
update_trace_handler(),
_ = file:del_dir_r(zip_dir()),
@ -302,7 +302,7 @@ update_trace(Traces) ->
ok = stop_trace(NeedStop, Started),
clean_stale_trace_files(),
NextTime = find_closest_time(Traces, Now),
emqx_misc:start_timer(NextTime, update_trace).
emqx_utils:start_timer(NextTime, update_trace).
stop_all_trace_handler() ->
lists:foreach(

View File

@ -196,7 +196,7 @@ handler_id(Name, Type) ->
do_handler_id(Name, Type)
catch
_:_ ->
Hash = emqx_misc:bin_to_hexstr(crypto:hash(md5, Name), lower),
Hash = emqx_utils:bin_to_hexstr(crypto:hash(md5, Name), lower),
do_handler_id(Hash, Type)
end.

View File

@ -107,7 +107,7 @@ code_change(_OldVsn, State, _Extra) ->
start_check_timer() ->
Interval = emqx:get_config([sysmon, vm, process_check_interval]),
emqx_misc:start_timer(Interval, check).
emqx_utils:start_timer(Interval, check).
usage(Percent) ->
integer_to_list(floor(Percent * 100)) ++ "%".

View File

@ -52,7 +52,7 @@
-export([set_field/3]).
-import(
emqx_misc,
emqx_utils,
[
maybe_apply/2,
start_timer/2
@ -172,7 +172,7 @@ stats(WsPid) when is_pid(WsPid) ->
stats(#state{channel = Channel}) ->
SockStats = emqx_pd:get_counters(?SOCK_STATS),
ChanStats = emqx_channel:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
ProcStats = emqx_utils:proc_stats(),
lists:append([SockStats, ChanStats, ProcStats]).
%% kick|discard|takeover
@ -340,7 +340,7 @@ tune_heap_size(Channel) ->
)
of
#{enable := false} -> ok;
ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy)
ShutdownPolicy -> emqx_utils:tune_heap_size(ShutdownPolicy)
end.
get_stats_enable(Zone) ->
@ -454,7 +454,7 @@ websocket_info(
State = #state{listener = {Type, Listener}}
) ->
ActiveN = get_active_n(Type, Listener),
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
websocket_info(
{timeout, _, limit_timeout},
@ -678,7 +678,7 @@ check_oom(State = #state{channel = Channel}) ->
#{enable := false} ->
State;
#{enable := true} ->
case emqx_misc:check_oom(ShutdownPolicy) of
case emqx_utils:check_oom(ShutdownPolicy) of
Shutdown = {shutdown, _Reason} ->
postpone(Shutdown, State);
_Other ->
@ -913,7 +913,7 @@ inc_qos_stats_key(_, _) -> undefined.
%% Cancel idle timer
cancel_idle_timer(State = #state{idle_timer = IdleTimer}) ->
ok = emqx_misc:cancel_timer(IdleTimer),
ok = emqx_utils:cancel_timer(IdleTimer),
State#state{idle_timer = undefined}.
%%--------------------------------------------------------------------
@ -1046,7 +1046,7 @@ check_max_connection(Type, Listener) ->
%%--------------------------------------------------------------------
set_field(Name, Value, State) ->
Pos = emqx_misc:index_of(Name, record_info(fields, state)),
Pos = emqx_utils:index_of(Name, record_info(fields, state)),
setelement(Pos + 1, State, Value).
%% ensure lowercase letters in headers

View File

@ -496,16 +496,16 @@ t_get_conn_info(_) ->
t_oom_shutdown(init, Config) ->
ok = snabbkaffe:start_trace(),
ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]),
ok = meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
meck:expect(
emqx_misc,
emqx_utils,
check_oom,
fun(_) -> {shutdown, "fake_oom"} end
),
Config;
t_oom_shutdown('end', _Config) ->
snabbkaffe:stop(),
meck:unload(emqx_misc),
meck:unload(emqx_utils),
ok.
t_oom_shutdown(_) ->

View File

@ -1,209 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_misc_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-define(SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}
]).
all() -> emqx_common_test_helpers:all(?MODULE).
t_merge_opts(_) ->
Opts = emqx_misc:merge_opts(?SOCKOPTS, [
raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}
]),
?assertEqual(1024, proplists:get_value(backlog, Opts)),
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
?assertEqual(
[
binary,
raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}
],
lists:sort(Opts)
).
t_maybe_apply(_) ->
?assertEqual(undefined, emqx_misc:maybe_apply(fun(A) -> A end, undefined)),
?assertEqual(a, emqx_misc:maybe_apply(fun(A) -> A end, a)).
t_run_fold(_) ->
?assertEqual(1, emqx_misc:run_fold([], 1, state)),
Add = fun(I, St) -> I + St end,
Mul = fun(I, St) -> I * St end,
?assertEqual(6, emqx_misc:run_fold([Add, Mul], 1, 2)).
t_pipeline(_) ->
?assertEqual({ok, input, state}, emqx_misc:pipeline([], input, state)),
Funs = [
fun(_I, _St) -> ok end,
fun(_I, St) -> {ok, St + 1} end,
fun(I, St) -> {ok, I + 1, St + 1} end,
fun(I, St) -> {ok, I * 2, St * 2} end
],
?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)),
?assertEqual(
{error, undefined, 1}, emqx_misc:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
),
?assertEqual(
{error, undefined, 2}, emqx_misc:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
).
t_start_timer(_) ->
TRef = emqx_misc:start_timer(1, tmsg),
timer:sleep(2),
?assertEqual([{timeout, TRef, tmsg}], drain()),
ok = emqx_misc:cancel_timer(TRef).
t_cancel_timer(_) ->
Timer = emqx_misc:start_timer(0, foo),
ok = emqx_misc:cancel_timer(Timer),
?assertEqual([], drain()),
ok = emqx_misc:cancel_timer(undefined).
t_proc_name(_) ->
?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)).
t_proc_stats(_) ->
Pid1 = spawn(fun() -> exit(normal) end),
timer:sleep(10),
?assertEqual([], emqx_misc:proc_stats(Pid1)),
Pid2 = spawn(fun() ->
?assertMatch([{mailbox_len, 0} | _], emqx_misc:proc_stats()),
timer:sleep(200)
end),
timer:sleep(10),
Pid2 ! msg,
timer:sleep(10),
?assertMatch([{mailbox_len, 1} | _], emqx_misc:proc_stats(Pid2)).
t_drain_deliver(_) ->
self() ! {deliver, t1, m1},
self() ! {deliver, t2, m2},
?assertEqual(
[
{deliver, t1, m1},
{deliver, t2, m2}
],
emqx_misc:drain_deliver(2)
).
t_drain_down(_) ->
{Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
{Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
timer:sleep(100),
?assertEqual([Pid1, Pid2], lists:sort(emqx_misc:drain_down(2))),
?assertEqual([], emqx_misc:drain_down(1)).
t_index_of(_) ->
try emqx_misc:index_of(a, []) of
_ -> ct:fail(should_throw_error)
catch
error:Reason ->
?assertEqual(badarg, Reason)
end,
?assertEqual(3, emqx_misc:index_of(a, [b, c, a, e, f])).
t_check(_) ->
Policy = #{
max_message_queue_len => 10,
max_heap_size => 1024 * 1024 * 8,
enable => true
},
[self() ! {msg, I} || I <- lists:seq(1, 5)],
?assertEqual(ok, emqx_misc:check_oom(Policy)),
[self() ! {msg, I} || I <- lists:seq(1, 6)],
?assertEqual(
{shutdown, #{reason => message_queue_too_long, value => 11, max => 10}},
emqx_misc:check_oom(Policy)
).
drain() ->
drain([]).
drain(Acc) ->
receive
Msg -> drain([Msg | Acc])
after 0 ->
lists:reverse(Acc)
end.
t_rand_seed(_) ->
?assert(is_tuple(emqx_misc:rand_seed())).
t_now_to_secs(_) ->
?assert(is_integer(emqx_misc:now_to_secs(os:timestamp()))).
t_now_to_ms(_) ->
?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))).
t_gen_id(_) ->
?assertEqual(10, length(emqx_misc:gen_id(10))),
?assertEqual(20, length(emqx_misc:gen_id(20))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_misc:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_misc:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_misc:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).

View File

@ -119,7 +119,7 @@ t_has_routes(_) ->
?R:delete_route(<<"devices/+/messages">>).
t_unexpected(_) ->
Router = emqx_misc:proc_name(?R, 1),
Router = emqx_utils:proc_name(?R, 1),
?assertEqual(ignored, gen_server:call(Router, bad_request)),
?assertEqual(ok, gen_server:cast(Router, bad_message)),
Router ! bad_info.

View File

@ -47,7 +47,7 @@ create(#{path := Path} = Source) ->
?SLOG(alert, #{
msg => failed_to_read_acl_file,
path => Path,
explain => emqx_misc:explain_posix(Reason)
explain => emqx_utils:explain_posix(Reason)
}),
throw(failed_to_read_acl_file);
{error, Reason} ->

View File

@ -148,8 +148,8 @@ set_special_configs(_App) ->
ok.
init_per_testcase(t_api, Config) ->
meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_misc, gen_id, fun() -> "fake" end),
meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]),
meck:expect(emqx_utils, gen_id, fun() -> "fake" end),
meck:new(emqx, [non_strict, passthrough, no_history, no_link]),
meck:expect(
@ -165,7 +165,7 @@ init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_api, _Config) ->
meck:unload(emqx_misc),
meck:unload(emqx_utils),
meck:unload(emqx),
ok;
end_per_testcase(_, _Config) ->

View File

@ -296,7 +296,7 @@ create(BridgeType, BridgeName, RawConf) ->
brige_action => create,
bridge_type => BridgeType,
bridge_name => BridgeName,
bridge_raw_config => emqx_misc:redact(RawConf)
bridge_raw_config => emqx_utils:redact(RawConf)
}),
emqx_conf:update(
emqx_bridge:config_key_path() ++ [BridgeType, BridgeName],

View File

@ -668,7 +668,7 @@ get_metrics_from_local_node(BridgeType, BridgeName) ->
false ->
?BRIDGE_NOT_ENABLED;
true ->
case emqx_misc:safe_to_existing_atom(Node, utf8) of
case emqx_utils:safe_to_existing_atom(Node, utf8) of
{ok, TargetNode} ->
call_operation(TargetNode, OperFunc, [
TargetNode, BridgeType, BridgeName
@ -835,7 +835,7 @@ format_resource_data(ResData) ->
format_resource_data(error, undefined, Result) ->
Result;
format_resource_data(error, Error, Result) ->
Result#{status_reason => emqx_misc:readable_error_msg(Error)};
Result#{status_reason => emqx_utils:readable_error_msg(Error)};
format_resource_data(K, V, Result) ->
Result#{K => V}.
@ -1004,7 +1004,7 @@ supported_versions(get_metrics_from_all_nodes) -> [4];
supported_versions(_Call) -> [1, 2, 3, 4].
redact(Term) ->
emqx_misc:redact(Term).
emqx_utils:redact(Term).
deobfuscate(NewConf, OldConf) ->
maps:fold(
@ -1015,7 +1015,7 @@ deobfuscate(NewConf, OldConf) ->
{ok, OldV} when is_map(V), is_map(OldV) ->
Acc#{K => deobfuscate(V, OldV)};
{ok, OldV} ->
case emqx_misc:is_redacted(K, V) of
case emqx_utils:is_redacted(K, V) of
true ->
Acc#{K => OldV};
_ ->

View File

@ -157,7 +157,7 @@ create(Type, Name, Conf, Opts0) ->
msg => "create bridge",
type => Type,
name => Name,
config => emqx_misc:redact(Conf)
config => emqx_utils:redact(Conf)
}),
Opts = override_start_after_created(Conf, Opts0),
{ok, _Data} = emqx_resource:create_local(
@ -192,7 +192,7 @@ update(Type, Name, {OldConf, Conf}, Opts0) ->
msg => "update bridge",
type => Type,
name => Name,
config => emqx_misc:redact(Conf)
config => emqx_utils:redact(Conf)
}),
case recreate(Type, Name, Conf, Opts) of
{ok, _} ->
@ -202,7 +202,7 @@ update(Type, Name, {OldConf, Conf}, Opts0) ->
msg => "updating_a_non_existing_bridge",
type => Type,
name => Name,
config => emqx_misc:redact(Conf)
config => emqx_utils:redact(Conf)
}),
create(Type, Name, Conf, Opts);
{error, Reason} ->
@ -236,8 +236,8 @@ recreate(Type, Name, Conf, Opts) ->
).
create_dry_run(Type, Conf0) ->
TmpPath0 = iolist_to_binary([?TEST_ID_PREFIX, emqx_misc:gen_id(8)]),
TmpPath = emqx_misc:safe_filename(TmpPath0),
TmpPath0 = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]),
TmpPath = emqx_utils:safe_filename(TmpPath0),
Conf = emqx_map_lib:safe_atom_key_map(Conf0),
case emqx_connector_ssl:convert_certs(TmpPath, Conf) of
{error, Reason} ->

View File

@ -975,7 +975,7 @@ t_with_redact_update(Config) ->
),
%% update with redacted config
BridgeConf = emqx_misc:redact(Template),
BridgeConf = emqx_utils:redact(Template),
BridgeID = emqx_bridge_resource:bridge_id(Type, Name),
{ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config),
?assertEqual(

View File

@ -156,7 +156,7 @@ on_start(InstanceId, Config) ->
msg => "failed_to_start_kafka_consumer_client",
instance_id => InstanceId,
kafka_hosts => BootstrapHosts,
reason => emqx_misc:redact(Reason)
reason => emqx_utils:redact(Reason)
}),
throw(?CLIENT_DOWN_MESSAGE)
end,
@ -344,7 +344,7 @@ start_consumer(Config, InstanceId, ClientID) ->
msg => "failed_to_start_kafka_consumer",
instance_id => InstanceId,
kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0),
reason => emqx_misc:redact(Reason2)
reason => emqx_utils:redact(Reason2)
}),
stop_client(ClientID),
throw(failed_to_start_kafka_consumer)

View File

@ -299,7 +299,7 @@ init_per_testcase(TestCase, Config) when
common_init_per_testcase(TestCase, Config)
end;
init_per_testcase(t_cluster_group = TestCase, Config0) ->
Config = emqx_misc:merge_opts(Config0, [{num_partitions, 6}]),
Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]),
common_init_per_testcase(TestCase, Config);
init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) ->
KafkaTopicBase =
@ -1543,7 +1543,7 @@ do_t_receive_after_recovery(Config) ->
%% 2) publish messages while the consumer is down.
%% we use `pmap' to avoid wolff sending the whole
%% batch to a single partition.
emqx_misc:pmap(fun(Msg) -> publish(Config, [Msg]) end, Messages1),
emqx_utils:pmap(fun(Msg) -> publish(Config, [Msg]) end, Messages1),
ok
end),
%% 3) restore and consume messages
@ -1667,7 +1667,7 @@ t_cluster_group(Config) ->
|| {Name, Opts} <- Cluster
],
on_exit(fun() ->
emqx_misc:pmap(
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)
@ -1889,7 +1889,7 @@ t_cluster_node_down(Config) ->
Cluster
),
on_exit(fun() ->
emqx_misc:pmap(
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)

View File

@ -501,15 +501,17 @@ log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) ->
%% because nothing is committed
case IsSuccess of
true ->
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => emqx_misc:redact(Res)});
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => emqx_utils:redact(Res)});
false ->
?SLOG(warning, Meta#{msg => "cluster_rpc_apply_result", result => emqx_misc:redact(Res)})
?SLOG(warning, Meta#{
msg => "cluster_rpc_apply_result", result => emqx_utils:redact(Res)
})
end;
log_and_alarm(true, Res, Meta) ->
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => emqx_misc:redact(Res)}),
?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => emqx_utils:redact(Res)}),
do_alarm(deactivate, Res, Meta);
log_and_alarm(false, Res, Meta) ->
?SLOG(error, Meta#{msg => "cluster_rpc_apply_failed", result => emqx_misc:redact(Res)}),
?SLOG(error, Meta#{msg => "cluster_rpc_apply_failed", result => emqx_utils:redact(Res)}),
do_alarm(activate, Res, Meta).
do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->

View File

@ -73,7 +73,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef).
emqx_utils:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -82,7 +82,7 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
ensure_timer(State = #{cleanup_ms := Ms}) ->
State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}.
State#{timer := emqx_utils:start_timer(Ms, del_stale_mfa)}.
%% @doc Keep the latest completed 100 records for querying and troubleshooting.
del_stale_mfa(MaxHistory) ->

View File

@ -219,7 +219,7 @@ on_start(
SSLOpts = emqx_tls_lib:to_client_opts(maps:get(ssl, Config)),
{tls, SSLOpts}
end,
NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
PoolOpts = [
{host, Host},
{port, Port},
@ -425,7 +425,7 @@ do_get_status(PoolName, Timeout) ->
Error
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
% we crash in case of non-empty lists since we don't know what to do in that case
[_ | _] = Results ->
case [E || {error, _} = E <- Results] of
@ -603,7 +603,7 @@ is_sensitive_key(_) ->
%% Function that will do a deep traversal of Data and remove sensitive
%% information (i.e., passwords)
redact(Data) ->
emqx_misc:redact(Data, fun is_sensitive_key/1).
emqx_utils:redact(Data, fun is_sensitive_key/1).
%% because the body may contain some sensitive data
%% and at the same time the redact function will not scan the binary data

View File

@ -65,7 +65,7 @@ on_start(
?SLOG(info, #{
msg => "starting_ldap_connector",
connector => InstId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS),
SslOpts =

View File

@ -162,7 +162,7 @@ on_start(
rs -> "starting_mongodb_replica_set_connector";
sharded -> "starting_mongodb_sharded_connector"
end,
?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_misc:redact(Config)}),
?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}),
NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config),
SslOpts =
case maps:get(enable, SSL) of

View File

@ -149,7 +149,7 @@ on_start(InstanceId, Conf) ->
?SLOG(info, #{
msg => "starting_mqtt_connector",
connector => InstanceId,
config => emqx_misc:redact(Conf)
config => emqx_utils:redact(Conf)
}),
BasicConf = basic_config(Conf),
BridgeConf = BasicConf#{

View File

@ -102,7 +102,7 @@ on_start(
?SLOG(info, #{
msg => "starting_mysql_connector",
connector => InstId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
SslOpts =
case maps:get(enable, SSL) of

View File

@ -95,7 +95,7 @@ on_start(
?SLOG(info, #{
msg => "starting_postgresql_connector",
connector => InstId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
SslOpts =
case maps:get(enable, SSL) of

View File

@ -123,7 +123,7 @@ on_start(
?SLOG(info, #{
msg => "starting_redis_connector",
connector => InstId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
ConfKey =
case Type of

View File

@ -85,7 +85,7 @@ to_remote_msg(MapMsg, #{
qos = QoS,
retain = Retain,
topic = topic(Mountpoint, Topic),
props = emqx_misc:pub_props_to_packet(PubProps),
props = emqx_utils:pub_props_to_packet(PubProps),
payload = Payload
};
to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) ->
@ -112,7 +112,7 @@ to_broker_msg(
Retain = replace_simple_var(RetainToken, MapMsg),
PubProps = maps:get(pub_props, MapMsg, #{}),
set_headers(
Props#{properties => emqx_misc:pub_props_to_packet(PubProps)},
Props#{properties => emqx_utils:pub_props_to_packet(PubProps)},
emqx_message:set_flags(
#{dup => Dup, retain => Retain},
emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload)

View File

@ -124,7 +124,7 @@ start_link(Name, BridgeOpts) ->
{error, Reason} = Error ->
?SLOG(error, #{
msg => "client_start_failed",
config => emqx_misc:redact(BridgeOpts),
config => emqx_utils:redact(BridgeOpts),
reason => Reason
}),
Error

View File

@ -915,4 +915,4 @@ schema_converter(Options) ->
maps:get(schema_converter, Options, fun hocon_schema_to_spec/2).
hocon_error_msg(Reason) ->
emqx_misc:readable_error_msg(Reason).
emqx_utils:readable_error_msg(Reason).

View File

@ -478,7 +478,7 @@ call_cluster(Fun) ->
%%--------------------------------------------------------------------
%% Internal Funcs
%%--------------------------------------------------------------------
err_msg(Msg) -> emqx_misc:readable_error_msg(Msg).
err_msg(Msg) -> emqx_utils:readable_error_msg(Msg).
get_raw_config() ->
RawConfig = emqx:get_raw_config([exhook, servers], []),

View File

@ -173,7 +173,7 @@ stats(#state{
end,
ConnStats = emqx_pd:get_counters(?CONN_STATS),
ChanStats = ChannMod:stats(Channel),
ProcStats = emqx_misc:proc_stats(),
ProcStats = emqx_utils:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
call(Pid, Req) ->
@ -297,7 +297,7 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) ->
StatsTimer = emqx_gateway_utils:stats_timer(Options),
IdleTimeout = emqx_gateway_utils:idle_timeout(Options),
OomPolicy = emqx_gateway_utils:oom_policy(Options),
IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout),
IdleTimer = emqx_utils:start_timer(IdleTimeout, idle_timeout),
#state{
socket = WrappedSock,
peername = Peername,
@ -327,7 +327,7 @@ run_loop(
}
) ->
emqx_logger:set_metadata_peername(esockd:format(Peername)),
_ = emqx_misc:tune_heap_size(OomPolicy),
_ = emqx_utils:tune_heap_size(OomPolicy),
case activate_socket(State) of
{ok, NState} ->
hibernate(Parent, NState);
@ -383,14 +383,14 @@ wakeup_from_hib(Parent, State) ->
%% Ensure/cancel stats timer
ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)};
State#state{stats_timer = emqx_utils:start_timer(Timeout, emit_stats)};
ensure_stats_timer(_Timeout, State) ->
State.
cancel_stats_timer(State = #state{stats_timer = TRef}) when
is_reference(TRef)
->
ok = emqx_misc:cancel_timer(TRef),
ok = emqx_utils:cancel_timer(TRef),
State#state{stats_timer = undefined};
cancel_stats_timer(State) ->
State.
@ -471,7 +471,7 @@ handle_msg(
State = #state{idle_timer = IdleTimer}
) ->
IdleTimer /= undefined andalso
emqx_misc:cancel_timer(IdleTimer),
emqx_utils:cancel_timer(IdleTimer),
NState = State#state{idle_timer = undefined},
handle_incoming(Packet, NState);
handle_msg({outgoing, Data}, State) ->
@ -501,7 +501,7 @@ handle_msg(
Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN}
) ->
Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)],
Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)],
with_channel(handle_deliver, [Delivers], State);
%% Something sent
%% TODO: Who will deliver this message?
@ -904,7 +904,7 @@ handle_info(Info, State) ->
%% msg => "reach_rate_limit",
%% pause => Time
%% }),
%% TRef = emqx_misc:start_timer(Time, limit_timeout),
%% TRef = emqx_utils:start_timer(Time, limit_timeout),
%% State#state{
%% sockstate = blocked,
%% limiter = Limiter1,
@ -928,7 +928,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) ->
end.
check_oom(State = #state{oom_policy = OomPolicy}) ->
case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
case ?ENABLED(OomPolicy) andalso emqx_utils:check_oom(OomPolicy) of
{shutdown, Reason} ->
%% triggers terminate/2 callback immediately
erlang:exit({shutdown, Reason});

View File

@ -115,7 +115,7 @@ clients(get, #{
fun ?MODULE:format_channel_info/2
);
Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query(

View File

@ -802,7 +802,7 @@ handle_info(
{'DOWN', _MRef, process, Pid, _Reason},
State = #state{gwname = GwName, chan_pmon = PMon}
) ->
ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)],
ChanPids = [Pid | emqx_utils:drain_down(?DEFAULT_BATCH_SIZE)],
{Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon),
CmTabs = cmtabs(GwName),

View File

@ -495,7 +495,7 @@ reason2msg(
reason2msg(
{#{roots := [{gateway, _}]}, [_ | _]} = Error
) ->
Bin = emqx_misc:readable_error_msg(Error),
Bin = emqx_utils:readable_error_msg(Error),
<<"Invalid configurations: ", Bin/binary>>;
reason2msg(_) ->
error.

View File

@ -223,7 +223,7 @@ merge_default(Udp, Options) ->
case lists:keytake(Key, 1, Options) of
{value, {Key, TcpOpts}, Options1} ->
[
{Key, emqx_misc:merge_opts(Default, TcpOpts)}
{Key, emqx_utils:merge_opts(Default, TcpOpts)}
| Options1
];
false ->
@ -482,7 +482,7 @@ frame_options(Options) ->
-spec init_gc_state(map()) -> emqx_gc:gc_state() | undefined.
init_gc_state(Options) ->
emqx_misc:maybe_apply(fun emqx_gc:init/1, force_gc_policy(Options)).
emqx_utils:maybe_apply(fun emqx_gc:init/1, force_gc_policy(Options)).
-spec force_gc_policy(map()) -> emqx_gc:opts() | undefined.
force_gc_policy(Options) ->

View File

@ -111,7 +111,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session);
emqx_utils:maybe_apply(fun emqx_coap_session:info/1, Session);
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(ctx, #channel{ctx = Ctx}) ->
@ -366,7 +366,7 @@ ensure_timer(Name, Time, Msg, #channel{timers = Timers} = Channel) ->
end.
make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
ensure_keepalive_timer(Channel) ->
@ -710,7 +710,7 @@ process_connection(
) ->
Queries = emqx_coap_message:get_option(uri_query, Req),
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun enrich_conninfo/2,
fun run_conn_hooks/2,

View File

@ -81,7 +81,7 @@
%%%-------------------------------------------------------------------
-spec new() -> session().
new() ->
_ = emqx_misc:rand_seed(),
_ = emqx_utils:rand_seed(),
#session{
transport_manager = emqx_coap_tm:new(),
observe_manager = emqx_coap_observe_res:new_manager(),

View File

@ -272,12 +272,12 @@ cancel_state_timer(#state_machine{timers = Timers} = Machine) ->
undefined ->
Machine;
Ref ->
_ = emqx_misc:cancel_timer(Ref),
_ = emqx_utils:cancel_timer(Ref),
Machine#state_machine{timers = maps:remove(state_timer, Timers)}
end.
process_timer(SeqId, {Type, Interval, Msg}, Timers) ->
Ref = emqx_misc:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}),
Ref = emqx_utils:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}),
Timers#{Type => Ref}.
-spec delete_machine(manager_key(), manager()) -> manager().
@ -293,7 +293,7 @@ delete_machine(Id, Manager) ->
} ->
lists:foreach(
fun({_, Ref}) ->
emqx_misc:cancel_timer(Ref)
emqx_utils:cancel_timer(Ref)
end,
maps:to_list(Timers)
),

View File

@ -119,7 +119,7 @@ idle(out, #coap_message{type = non} = Msg, _) ->
timeouts => [{stop_timeout, ?NON_LIFETIME}]
});
idle(out, Msg, Transport) ->
_ = emqx_misc:rand_seed(),
_ = emqx_utils:rand_seed(),
Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR),
out(Msg, #{
next => wait_ack,

View File

@ -681,14 +681,14 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->
ensure_timer(Name, remove_timer_ref(Name, Channel)).
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)),
emqx_utils:cancel_timer(maps:get(Name, Timers, undefined)),
remove_timer_ref(Name, Channel).
remove_timer_ref(Name, Channel = #channel{timers = Timers}) ->
@ -792,4 +792,4 @@ proto_name_to_protocol(ProtoName) when is_binary(ProtoName) ->
binary_to_atom(ProtoName).
anonymous_clientid() ->
iolist_to_binary(["exproto-", emqx_misc:gen_id()]).
iolist_to_binary(["exproto-", emqx_utils:gen_id()]).

View File

@ -50,7 +50,7 @@
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[]

View File

@ -105,7 +105,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session);
emqx_utils:maybe_apply(fun emqx_lwm2m_session:info/1, Session);
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
ClientId;
info(ctx, #channel{ctx = Ctx}) ->
@ -286,7 +286,7 @@ handle_call(discard, _From, Channel) ->
% pendings = Pendings}) ->
% ok = emqx_session:takeover(Session),
% %% TODO: Should not drain deliver here (side effect)
% Delivers = emqx_misc:drain_deliver(),
% Delivers = emqx_utils:drain_deliver(),
% AllPendings = lists:append(Delivers, Pendings),
% shutdown_and_reply(takenover, AllPendings, Channel);
@ -390,7 +390,7 @@ set_peercert_infos(Peercert, ClientInfo) ->
ClientInfo#{dn => DN, cn => CN}.
make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) ->
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
update_life_timer(#channel{session = Session, timers = Timers} = Channel) ->
@ -413,7 +413,7 @@ do_takeover(_DesireId, Msg, Channel) ->
do_connect(Req, Result, Channel, Iter) ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun check_lwm2m_version/2,
fun enrich_conninfo/2,

View File

@ -218,7 +218,7 @@ info(conn_state, #channel{conn_state = ConnState}) ->
info(clientinfo, #channel{clientinfo = ClientInfo}) ->
ClientInfo;
info(session, #channel{session = Session}) ->
emqx_misc:maybe_apply(fun emqx_session:info/1, Session);
emqx_utils:maybe_apply(fun emqx_session:info/1, Session);
info(will_msg, #channel{will_msg = WillMsg}) ->
WillMsg;
info(clientid, #channel{clientinfo = #{clientid := ClientId}}) ->
@ -282,7 +282,7 @@ enrich_clientinfo(
feedvar(Override, Packet, ConnInfo, ClientInfo0),
ClientInfo0
),
{ok, NPacket, NClientInfo} = emqx_misc:pipeline(
{ok, NPacket, NClientInfo} = emqx_utils:pipeline(
[
fun maybe_assign_clientid/2,
%% FIXME: CALL After authentication successfully
@ -414,7 +414,7 @@ process_connect(
Channel#channel{session = Session}
);
{ok, #{session := Session, present := true, pendings := Pendings}} ->
Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())),
Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())),
NChannel = Channel#channel{
session = Session,
resuming = true,
@ -595,7 +595,7 @@ handle_in(
Channel = #channel{conn_state = idle}
) ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun enrich_conninfo/2,
fun run_conn_hooks/2,
@ -718,7 +718,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) ->
Id
end,
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun check_qos3_enable/2,
fun preproc_pub_pkt/2,
@ -877,7 +877,7 @@ handle_in(
end;
handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun preproc_subs_type/2,
fun check_subscribe_authz/2,
@ -911,7 +911,7 @@ handle_in(
Channel
) ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun preproc_unsub_type/2,
fun run_client_unsub_hook/2,
@ -1823,7 +1823,7 @@ handle_call(
) ->
ok = emqx_session:takeover(Session),
%% TODO: Should not drain deliver here (side effect)
Delivers = emqx_misc:drain_deliver(),
Delivers = emqx_utils:drain_deliver(),
AllPendings = lists:append(Delivers, Pendings),
shutdown_and_reply(takenover, AllPendings, Channel);
%handle_call(list_authz_cache, _From, Channel) ->
@ -2247,7 +2247,7 @@ ensure_register_timer(Channel) ->
ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) ->
Msg = maps:get(register_timer, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}),
TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}),
Channel#channel{timers = Timers#{register_timer => TRef}}.
cancel_timer(Name, Channel = #channel{timers = Timers}) ->
@ -2255,7 +2255,7 @@ cancel_timer(Name, Channel = #channel{timers = Timers}) ->
undefined ->
Channel;
TRef ->
emqx_misc:cancel_timer(TRef),
emqx_utils:cancel_timer(TRef),
Channel#channel{timers = maps:without([Name], Timers)}
end.
@ -2270,7 +2270,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->

View File

@ -252,7 +252,7 @@ enrich_clientinfo(
feedvar(Override, Packet, ConnInfo, ClientInfo0),
ClientInfo0
),
{ok, NPacket, NClientInfo} = emqx_misc:pipeline(
{ok, NPacket, NClientInfo} = emqx_utils:pipeline(
[
fun maybe_assign_clientid/2,
fun parse_heartbeat/2,
@ -416,7 +416,7 @@ handle_in(
{error, unexpected_connect, Channel};
handle_in(Packet = ?PACKET(?CMD_CONNECT), Channel) ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun enrich_conninfo/2,
fun negotiate_version/2,
@ -474,7 +474,7 @@ handle_in(
Topic = header(<<"destination">>, Headers),
Ack = header(<<"ack">>, Headers, <<"auto">>),
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun parse_topic_filter/2,
fun check_subscribed_status/2,
@ -796,7 +796,7 @@ handle_call(
reply({error, no_subid}, Channel);
SubId ->
case
emqx_misc:pipeline(
emqx_utils:pipeline(
[
fun parse_topic_filter/2,
fun check_subscribed_status/2
@ -869,7 +869,7 @@ handle_call(discard, _From, Channel) ->
% pendings = Pendings}) ->
% ok = emqx_session:takeover(Session),
% %% TODO: Should not drain deliver here (side effect)
% Delivers = emqx_misc:drain_deliver(),
% Delivers = emqx_utils:drain_deliver(),
% AllPendings = lists:append(Delivers, Pendings),
% shutdown_and_reply(takenover, AllPendings, Channel);
@ -1289,7 +1289,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) ->
ensure_timer(Name, Time, Channel = #channel{timers = Timers}) ->
Msg = maps:get(Name, ?TIMER_TABLE),
TRef = emqx_misc:start_timer(Time, Msg),
TRef = emqx_utils:start_timer(Time, Msg),
Channel#channel{timers = Timers#{Name => TRef}}.
reset_timer(Name, Channel) ->

View File

@ -86,7 +86,7 @@ ensure_timer(State) ->
disabled ->
State;
Interval when is_integer(Interval) ->
TRef = emqx_misc:start_timer(Interval, run),
TRef = emqx_utils:start_timer(Interval, run),
State#{timer := TRef}
end.

View File

@ -644,7 +644,7 @@ list_clients(QString) ->
fun ?MODULE:format_channel_info/2
);
Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
QStringWithoutNode = maps:without([<<"node">>], QString),
emqx_mgmt_api:node_query(

View File

@ -149,7 +149,7 @@ subscriptions(get, #{query_string := QString}) ->
fun ?MODULE:format/2
);
Node0 ->
case emqx_misc:safe_to_existing_atom(Node0) of
case emqx_utils:safe_to_existing_atom(Node0) of
{ok, Node1} ->
emqx_mgmt_api:node_query(
Node1,

View File

@ -498,7 +498,7 @@ download_trace_log(get, #{bindings := #{name := Name}, query_string := Query}) -
%% We generate a session ID so that we name files
%% with unique names. Then we won't cause
%% overwrites for concurrent requests.
SessionId = emqx_misc:gen_id(),
SessionId = emqx_utils:gen_id(),
ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]),
ok = file:make_dir(ZipDir),
%% Write files to ZipDir and create an in-memory zip file

View File

@ -174,7 +174,7 @@ create_app(Name, ApiSecret, Enable, ExpiredAt, Desc) ->
desc = Desc,
created_at = erlang:system_time(second),
api_secret_hash = emqx_dashboard_admin:hash(ApiSecret),
api_key = list_to_binary(emqx_misc:gen_id(16))
api_key = list_to_binary(emqx_utils:gen_id(16))
},
case create_app(App) of
{ok, Res} ->
@ -213,7 +213,7 @@ do_force_create_app(App, ApiKey, NamePrefix) ->
end.
generate_unique_name(NamePrefix) ->
New = list_to_binary(NamePrefix ++ emqx_misc:gen_id(16)),
New = list_to_binary(NamePrefix ++ emqx_utils:gen_id(16)),
case mnesia:read(?APP, New) of
[] -> New;
_ -> generate_unique_name(NamePrefix)
@ -246,7 +246,7 @@ init_bootstrap_file(File) ->
{ok, MP} = re:compile(<<"(\.+):(\.+$)">>, [ungreedy]),
init_bootstrap_file(File, Dev, MP);
{error, Reason0} ->
Reason = emqx_misc:explain_posix(Reason0),
Reason = emqx_utils:explain_posix(Reason0),
?SLOG(
error,
#{

View File

@ -356,7 +356,7 @@ mnesia(_) ->
%% @doc Logger Command
log(["set-level", Level]) ->
case emqx_misc:safe_to_existing_atom(Level) of
case emqx_utils:safe_to_existing_atom(Level) of
{ok, Level1} ->
case emqx_logger:set_log_level(Level1) of
ok -> emqx_ctl:print("~ts~n", [Level]);
@ -369,7 +369,7 @@ log(["primary-level"]) ->
Level = emqx_logger:get_primary_log_level(),
emqx_ctl:print("~ts~n", [Level]);
log(["primary-level", Level]) ->
case emqx_misc:safe_to_existing_atom(Level) of
case emqx_utils:safe_to_existing_atom(Level) of
{ok, Level1} ->
_ = emqx_logger:set_primary_log_level(Level1),
ok;
@ -392,7 +392,7 @@ log(["handlers", "list"]) ->
],
ok;
log(["handlers", "start", HandlerId]) ->
case emqx_misc:safe_to_existing_atom(HandlerId) of
case emqx_utils:safe_to_existing_atom(HandlerId) of
{ok, HandlerId1} ->
case emqx_logger:start_log_handler(HandlerId1) of
ok ->
@ -406,7 +406,7 @@ log(["handlers", "start", HandlerId]) ->
emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
end;
log(["handlers", "stop", HandlerId]) ->
case emqx_misc:safe_to_existing_atom(HandlerId) of
case emqx_utils:safe_to_existing_atom(HandlerId) of
{ok, HandlerId1} ->
case emqx_logger:stop_log_handler(HandlerId1) of
ok ->
@ -420,9 +420,9 @@ log(["handlers", "stop", HandlerId]) ->
emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId])
end;
log(["handlers", "set-level", HandlerId, Level]) ->
case emqx_misc:safe_to_existing_atom(HandlerId) of
case emqx_utils:safe_to_existing_atom(HandlerId) of
{ok, HandlerId1} ->
case emqx_misc:safe_to_existing_atom(Level) of
case emqx_utils:safe_to_existing_atom(Level) of
{ok, Level1} ->
case emqx_logger:set_log_handler_level(HandlerId1, Level1) of
ok ->
@ -628,7 +628,7 @@ listeners([]) ->
emqx_listeners:list()
);
listeners(["stop", ListenerId]) ->
case emqx_misc:safe_to_existing_atom(ListenerId) of
case emqx_utils:safe_to_existing_atom(ListenerId) of
{ok, ListenerId1} ->
case emqx_listeners:stop_listener(ListenerId1) of
ok ->
@ -640,7 +640,7 @@ listeners(["stop", ListenerId]) ->
emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
end;
listeners(["start", ListenerId]) ->
case emqx_misc:safe_to_existing_atom(ListenerId) of
case emqx_utils:safe_to_existing_atom(ListenerId) of
{ok, ListenerId1} ->
case emqx_listeners:start_listener(ListenerId1) of
ok ->
@ -652,7 +652,7 @@ listeners(["start", ListenerId]) ->
emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId])
end;
listeners(["restart", ListenerId]) ->
case emqx_misc:safe_to_existing_atom(ListenerId) of
case emqx_utils:safe_to_existing_atom(ListenerId) of
{ok, ListenerId1} ->
case emqx_listeners:restart_listener(ListenerId1) of
ok ->

View File

@ -328,7 +328,7 @@ handle_info(Info, State) ->
terminate(_Reason, #{stats_timer := StatsTimer} = State) ->
emqx_conf:remove_handler([delayed]),
emqx_misc:cancel_timer(StatsTimer),
emqx_utils:cancel_timer(StatsTimer),
do_load_or_unload(false, State).
code_change(_Vsn, State, _Extra) ->
@ -370,14 +370,14 @@ ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when
Ts < PubAt
->
ok = emqx_misc:cancel_timer(TRef),
ok = emqx_utils:cancel_timer(TRef),
ensure_publish_timer(Ts, ?NOW, State);
ensure_publish_timer(_Key, State) ->
State.
ensure_publish_timer(Ts, Now, State) ->
Interval = max(1, Ts - Now),
TRef = emqx_misc:start_timer(Interval, do_publish),
TRef = emqx_utils:start_timer(Interval, do_publish),
State#{publish_timer := TRef, publish_at := Now + Interval}.
do_publish(Key, Now) ->
@ -418,7 +418,7 @@ do_load_or_unload(true, State) ->
State;
do_load_or_unload(false, #{publish_timer := PubTimer} = State) ->
emqx_hooks:del('message.publish', {?MODULE, on_message_publish}),
emqx_misc:cancel_timer(PubTimer),
emqx_utils:cancel_timer(PubTimer),
ets:delete_all_objects(?TAB),
State#{publish_timer := undefined, publish_at := 0};
do_load_or_unload(_, State) ->

View File

@ -161,7 +161,7 @@ handle_call(enable, _From, State) ->
FirstReportTimeoutMS = timer:seconds(10),
{reply, ok, ensure_report_timer(FirstReportTimeoutMS, State)};
handle_call(disable, _From, State = #state{timer = Timer}) ->
emqx_misc:cancel_timer(Timer),
emqx_utils:cancel_timer(Timer),
{reply, ok, State#state{timer = undefined}};
handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) ->
{reply, {ok, UUID}, State};
@ -205,7 +205,7 @@ ensure_report_timer(State = #state{report_interval = ReportInterval}) ->
ensure_report_timer(ReportInterval, State).
ensure_report_timer(ReportInterval, State) ->
State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}.
State#state{timer = emqx_utils:start_timer(ReportInterval, time_to_report_telemetry_data)}.
os_info() ->
case erlang:system_info(os_type) of

View File

@ -79,7 +79,7 @@ health_check_ecpool_workers(PoolName, CheckFunc, Timeout) ->
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->

View File

@ -225,7 +225,7 @@ tcp_connectivity(Host, Port) ->
) ->
ok | {error, Reason :: term()}.
tcp_connectivity(Host, Port, Timeout) ->
case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of
case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;

View File

@ -144,7 +144,7 @@ terminate(_Reason, _State) ->
ok.
ensure_timer(Interval) ->
emqx_misc:start_timer(Interval, ?TIMER_MSG).
emqx_utils:start_timer(Interval, ?TIMER_MSG).
%%--------------------------------------------------------------------
%% prometheus callbacks

View File

@ -1535,7 +1535,7 @@ queue_count(Q) ->
disk_queue_dir(Id, Index) ->
QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index),
QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]),
emqx_misc:safe_filename(QDir).
emqx_utils:safe_filename(QDir).
clear_disk_queue_dir(Id, Index) ->
ReplayQDir = disk_queue_dir(Id, Index),

View File

@ -539,7 +539,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) ->
Data#data{status = stopped}.
make_test_id() ->
RandId = iolist_to_binary(emqx_misc:gen_id(16)),
RandId = iolist_to_binary(emqx_utils:gen_id(16)),
<<?TEST_ID_PREFIX, RandId/binary>>.
handle_manually_health_check(From, Data) ->
@ -613,7 +613,7 @@ maybe_alarm(_Status, ResId, Error) ->
HrError =
case Error of
undefined -> <<"Unknown reason">>;
_Else -> emqx_misc:readable_error_msg(Error)
_Else -> emqx_utils:readable_error_msg(Error)
end,
emqx_alarm:activate(
ResId,

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [
{description, "EMQX Retainer"},
% strict semver, bump manually!
{vsn, "5.0.11"},
{vsn, "5.0.12"},
{modules, []},
{registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx, emqx_ctl]},

View File

@ -91,7 +91,7 @@ worker() ->
| ignore.
start_link(Pool, Id) ->
gen_server:start_link(
{local, emqx_misc:proc_name(?MODULE, Id)},
{local, emqx_utils:proc_name(?MODULE, Id)},
?MODULE,
[Pool, Id],
[{hibernate_after, 1000}]

View File

@ -175,7 +175,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) ->
flags = Flags,
headers = #{
republish_by => RuleId,
properties => emqx_misc:pub_props_to_packet(PubProps)
properties => emqx_utils:pub_props_to_packet(PubProps)
},
topic = Topic,
payload = Payload,

View File

@ -343,7 +343,7 @@ param_path_id() ->
{200, Result}
end;
'/rules'(post, #{body := Params0}) ->
case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of
case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of
<<>> ->
{400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}};
Id ->
@ -459,11 +459,11 @@ param_path_id() ->
%%------------------------------------------------------------------------------
err_msg({RuleError, {_E, Reason, _S}}) ->
emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason));
emqx_utils:readable_error_msg(encode_nested_error(RuleError, Reason));
err_msg({Reason, _Details}) ->
emqx_misc:readable_error_msg(Reason);
emqx_utils:readable_error_msg(Reason);
err_msg(Msg) ->
emqx_misc:readable_error_msg(Msg).
emqx_utils:readable_error_msg(Msg).
encode_nested_error(RuleError, Reason) when is_tuple(Reason) ->
encode_nested_error(RuleError, element(1, Reason));

View File

@ -643,10 +643,10 @@ map(Data) ->
emqx_plugin_libs_rule:map(Data).
bin2hexstr(Bin) when is_binary(Bin) ->
emqx_misc:bin_to_hexstr(Bin, upper).
emqx_utils:bin_to_hexstr(Bin, upper).
hexstr2bin(Str) when is_binary(Str) ->
emqx_misc:hexstr_to_bin(Str).
emqx_utils:hexstr_to_bin(Str).
%%------------------------------------------------------------------------------
%% NULL Funcs
@ -944,7 +944,7 @@ sha256(S) when is_binary(S) ->
hash(sha256, S).
hash(Type, Data) ->
emqx_misc:bin_to_hexstr(crypto:hash(Type, Data), lower).
emqx_utils:bin_to_hexstr(crypto:hash(Type, Data), lower).
%%------------------------------------------------------------------------------
%% gzip Funcs

View File

@ -48,7 +48,7 @@ test(#{sql := Sql, context := Context}) ->
end.
test_rule(Sql, Select, Context, EventTopics) ->
RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]),
RuleId = iolist_to_binary(["sql_tester:", emqx_utils:gen_id(16)]),
ok = emqx_rule_engine:maybe_add_metrics_for_rule(RuleId),
Rule = #{
id => RuleId,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_statsd, [
{description, "EMQX Statsd"},
{vsn, "5.0.7"},
{vsn, "5.0.8"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications, [

View File

@ -144,7 +144,7 @@ flush_interval(_FlushInterval, SampleInterval) ->
SampleInterval.
ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
State#{timer => emqx_utils:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.
check_multicall_result({Results, []}) ->
case

View File

@ -14,14 +14,12 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_misc).
-module(emqx_utils).
-compile(inline).
%% [TODO] Cleanup so the instruction below is not necessary.
-elvis([{elvis_style, god_modules, disable}]).
-include("types.hrl").
-include("logger.hrl").
-export([
merge_opts/2,
maybe_apply/2,
@ -71,6 +69,8 @@
-export([clamp/3, redact/1, redact/2, is_redacted/2, is_redacted/3]).
-type maybe(T) :: undefined | T.
-dialyzer({nowarn_function, [nolink_apply/2]}).
-define(SHORT, 8).
@ -221,6 +221,7 @@ drain_down(Cnt, Acc) ->
%% `ok': There is nothing out of the ordinary.
%% `shutdown': Some numbers (message queue length hit the limit),
%% hence shutdown for greater good (system stability).
%% [FIXME] cross-dependency on `emqx_types`.
-spec check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}.
check_oom(Policy) ->
check_oom(self(), Policy).
@ -279,6 +280,7 @@ proc_name(Mod, Id) ->
list_to_atom(lists:concat([Mod, "_", Id])).
%% Get Proc's Stats.
%% [FIXME] cross-dependency on `emqx_types`.
-spec proc_stats() -> emqx_types:stats().
proc_stats() -> proc_stats(self()).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -20,29 +20,188 @@
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
-define(SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}
]).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([emqx_conf, emqx_utils]),
Config.
all() -> emqx_common_test_helpers:all(?MODULE).
end_per_suite(_Config) ->
emqx_common_test_helpers:stop_apps([emqx_conf, emqx_utils]),
ok.
t_merge_opts(_) ->
Opts = emqx_utils:merge_opts(?SOCKOPTS, [
raw,
binary,
{backlog, 1024},
{nodelay, false},
{max_clients, 1024},
{acceptors, 16}
]),
?assertEqual(1024, proplists:get_value(backlog, Opts)),
?assertEqual(1024, proplists:get_value(max_clients, Opts)),
?assertEqual(
[
binary,
raw,
{acceptors, 16},
{backlog, 1024},
{max_clients, 1024},
{nodelay, false},
{packet, raw},
{reuseaddr, true}
],
lists:sort(Opts)
).
init_per_testcase(TestCase, Config) ->
emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config).
t_maybe_apply(_) ->
?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)),
?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)).
end_per_testcase(TestCase, Config) ->
emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config).
t_run_fold(_) ->
?assertEqual(1, emqx_utils:run_fold([], 1, state)),
Add = fun(I, St) -> I + St end,
Mul = fun(I, St) -> I * St end,
?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)).
t_fail(init, Config) ->
Config;
t_fail('end', _Config) ->
ok.
t_pipeline(_) ->
?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)),
Funs = [
fun(_I, _St) -> ok end,
fun(_I, St) -> {ok, St + 1} end,
fun(I, St) -> {ok, I + 1, St + 1} end,
fun(I, St) -> {ok, I * 2, St * 2} end
],
?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)),
?assertEqual(
{error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1)
),
?assertEqual(
{error, undefined, 2},
emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1)
).
t_fail(_Config) ->
?assert(false).
t_start_timer(_) ->
TRef = emqx_utils:start_timer(1, tmsg),
timer:sleep(2),
?assertEqual([{timeout, TRef, tmsg}], drain()),
ok = emqx_utils:cancel_timer(TRef).
t_cancel_timer(_) ->
Timer = emqx_utils:start_timer(0, foo),
ok = emqx_utils:cancel_timer(Timer),
?assertEqual([], drain()),
ok = emqx_utils:cancel_timer(undefined).
t_proc_name(_) ->
?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)).
t_proc_stats(_) ->
Pid1 = spawn(fun() -> exit(normal) end),
timer:sleep(10),
?assertEqual([], emqx_utils:proc_stats(Pid1)),
Pid2 = spawn(fun() ->
?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()),
timer:sleep(200)
end),
timer:sleep(10),
Pid2 ! msg,
timer:sleep(10),
?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)).
t_drain_deliver(_) ->
self() ! {deliver, t1, m1},
self() ! {deliver, t2, m2},
?assertEqual(
[
{deliver, t1, m1},
{deliver, t2, m2}
],
emqx_utils:drain_deliver(2)
).
t_drain_down(_) ->
{Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),
{Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end),
timer:sleep(100),
?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))),
?assertEqual([], emqx_utils:drain_down(1)).
t_index_of(_) ->
try emqx_utils:index_of(a, []) of
_ -> ct:fail(should_throw_error)
catch
error:Reason ->
?assertEqual(badarg, Reason)
end,
?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])).
t_check(_) ->
Policy = #{
max_message_queue_len => 10,
max_heap_size => 1024 * 1024 * 8,
enable => true
},
[self() ! {msg, I} || I <- lists:seq(1, 5)],
?assertEqual(ok, emqx_utils:check_oom(Policy)),
[self() ! {msg, I} || I <- lists:seq(1, 6)],
?assertEqual({shutdown, message_queue_too_long}, emqx_utils:check_oom(Policy)).
drain() ->
drain([]).
drain(Acc) ->
receive
Msg -> drain([Msg | Acc])
after 0 ->
lists:reverse(Acc)
end.
t_rand_seed(_) ->
?assert(is_tuple(emqx_utils:rand_seed())).
t_now_to_secs(_) ->
?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))).
t_now_to_ms(_) ->
?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))).
t_gen_id(_) ->
?assertEqual(10, length(emqx_utils:gen_id(10))),
?assertEqual(20, length(emqx_utils:gen_id(20))).
t_pmap_normal(_) ->
?assertEqual(
[5, 7, 9],
emqx_utils:pmap(
fun({A, B}) -> A + B end,
[{2, 3}, {3, 4}, {4, 5}]
)
).
t_pmap_timeout(_) ->
?assertExit(
timeout,
emqx_utils:pmap(
fun
(timeout) -> ct:sleep(1000);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, timeout],
100
)
).
t_pmap_exception(_) ->
?assertError(
foobar,
emqx_utils:pmap(
fun
(error) -> error(foobar);
({A, B}) -> A + B
end,
[{2, 3}, {3, 4}, error]
)
).

View File

@ -272,7 +272,7 @@ t_setup_via_config_and_publish(Config) ->
{ok, _},
create_bridge(Config)
),
MsgId = emqx_misc:gen_id(),
MsgId = emqx_utils:gen_id(),
SentData = #{id => MsgId, payload => ?PAYLOAD},
?check_trace(
begin
@ -309,7 +309,7 @@ t_setup_via_http_api_and_publish(Config) ->
{ok, _},
create_bridge_http(PgsqlConfig)
),
MsgId = emqx_misc:gen_id(),
MsgId = emqx_utils:gen_id(),
SentData = #{id => MsgId, payload => ?PAYLOAD},
?check_trace(
begin
@ -375,7 +375,7 @@ t_write_failure(Config) ->
#{?snk_kind := resource_connected_enter},
20_000
),
SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD},
SentData = #{id => emqx_utils:gen_id(), payload => ?PAYLOAD},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData)

View File

@ -809,7 +809,7 @@ test_publish_success_batch(Config) ->
%% making 1-sized batches. also important to note that the pool
%% size for the resource (replayq buffering) must be set to 1 to
%% avoid further segmentation of batches.
emqx_misc:pmap(fun emqx:publish/1, Messages),
emqx_utils:pmap(fun emqx:publish/1, Messages),
DecodedMessages0 = assert_http_request(ServiceAccountJSON),
?assertEqual(BatchSize, length(DecodedMessages0)),
DecodedMessages1 = assert_http_request(ServiceAccountJSON),

View File

@ -615,7 +615,7 @@ t_workload_fits_prepared_statement_limit(Config) ->
create_bridge(Config)
),
Results = lists:append(
emqx_misc:pmap(
emqx_utils:pmap(
fun(_) ->
[
begin

View File

@ -102,7 +102,7 @@ on_start(
?SLOG(info, #{
msg => "starting_cassandra_connector",
connector => InstId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
Options = [

View File

@ -139,7 +139,7 @@ on_start(
?SLOG(info, #{
msg => "starting_clickhouse_connector",
connector => InstanceID,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
PoolName = emqx_plugin_libs_pool:pool_name(InstanceID),
Options = [
@ -181,7 +181,7 @@ log_start_error(Config, Reason, Stacktrace) ->
#{
msg => "clickhouse_connector_start_failed",
error_reason => Reason,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
},
?SLOG(info, maps:merge(LogMessage, StacktraceMap)),
?tp(
@ -318,7 +318,7 @@ do_get_status(PoolName, Timeout) ->
Error
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
Results ->
case [E || {error, _} = E <- Results] of
[] ->

View File

@ -95,7 +95,7 @@ on_start(
?SLOG(info, #{
msg => "starting_dynamo_connector",
connector => InstanceId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
{Schema, Server} = get_host_schema(to_str(Url)),

View File

@ -86,7 +86,7 @@ on_start(
PoolType = random,
Transport = tls,
TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}),
NTransportOpts = emqx_misc:ipv6_probe(TransportOpts),
NTransportOpts = emqx_utils:ipv6_probe(TransportOpts),
PoolOpts = [
{host, Host},
{port, Port},
@ -587,7 +587,7 @@ do_get_status(InstanceId, PoolName, Timeout) ->
false
end
end,
try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of
try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of
[_ | _] = Status ->
lists:all(fun(St) -> St =:= true end, Status);
[] ->

View File

@ -216,8 +216,8 @@ start_client(InstId, Config) ->
?SLOG(info, #{
msg => "starting influxdb connector",
connector => InstId,
config => emqx_misc:redact(Config),
client_config => emqx_misc:redact(ClientConfig)
config => emqx_utils:redact(Config),
client_config => emqx_utils:redact(ClientConfig)
}),
try
do_start_client(InstId, ClientConfig, Config)
@ -353,7 +353,7 @@ password(_) ->
[].
redact_auth(Term) ->
emqx_misc:redact(Term, fun is_auth_key/1).
emqx_utils:redact(Term, fun is_auth_key/1).
is_auth_key(Key) when is_binary(Key) ->
string:equal("authorization", Key, true);

View File

@ -265,7 +265,7 @@ client_id(InstanceId) ->
erlang:binary_to_atom(Name, utf8).
redact(Msg) ->
emqx_misc:redact(Msg, fun is_sensitive_key/1).
emqx_utils:redact(Msg, fun is_sensitive_key/1).
is_sensitive_key(security_token) ->
true;

View File

@ -93,7 +93,7 @@ on_start(
?SLOG(info, #{
msg => "starting_tdengine_connector",
connector => InstanceId,
config => emqx_misc:redact(Config)
config => emqx_utils:redact(Config)
}),
{Host, Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS),

View File

@ -218,7 +218,7 @@ start_cluster(Cluster) ->
|| {Name, Opts} <- Cluster
],
on_exit(fun() ->
emqx_misc:pmap(
emqx_utils:pmap(
fun(N) ->
ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N)

Some files were not shown because too many files have changed in this diff Show More