diff --git a/apps/emqx/src/emqx_api_lib.erl b/apps/emqx/src/emqx_api_lib.erl index f1d65f350..5eddf6188 100644 --- a/apps/emqx/src/emqx_api_lib.erl +++ b/apps/emqx/src/emqx_api_lib.erl @@ -51,7 +51,7 @@ with_node_or_cluster(Node, Fun) -> -spec lookup_node(atom() | binary()) -> {ok, atom()} | not_found. lookup_node(BinNode) when is_binary(BinNode) -> - case emqx_misc:safe_to_existing_atom(BinNode, utf8) of + case emqx_utils:safe_to_existing_atom(BinNode, utf8) of {ok, Node} -> is_running_node(Node); _Error -> diff --git a/apps/emqx/src/emqx_authentication_config.erl b/apps/emqx/src/emqx_authentication_config.erl index 5471b66fc..be3b35f57 100644 --- a/apps/emqx/src/emqx_authentication_config.erl +++ b/apps/emqx/src/emqx_authentication_config.erl @@ -277,9 +277,9 @@ atom(Bin) -> binary_to_existing_atom(Bin, utf8). certs_dir(ChainName, ConfigOrID) -> DirName = dir(ChainName, ConfigOrID), SubDir = iolist_to_binary(filename:join(["authn", DirName])), - emqx_misc:safe_filename(SubDir). + emqx_utils:safe_filename(SubDir). dir(ChainName, ID) when is_binary(ID) -> - emqx_misc:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID])); + emqx_utils:safe_filename(iolist_to_binary([to_bin(ChainName), "-", ID])); dir(ChainName, Config) when is_map(Config) -> dir(ChainName, authenticator_id(Config)). diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index 758c570da..a0ccd93d7 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -243,7 +243,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{expiry_timer := TRef}) -> - emqx_misc:cancel_timer(TRef). + emqx_utils:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -254,10 +254,10 @@ code_change(_OldVsn, State, _Extra) -> -ifdef(TEST). ensure_expiry_timer(State) -> - State#{expiry_timer := emqx_misc:start_timer(10, expire)}. + State#{expiry_timer := emqx_utils:start_timer(10, expire)}. -else. ensure_expiry_timer(State) -> - State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}. + State#{expiry_timer := emqx_utils:start_timer(timer:minutes(1), expire)}. -endif. expire_banned_items(Now) -> diff --git a/apps/emqx/src/emqx_batch.erl b/apps/emqx/src/emqx_batch.erl index 2fe09942c..22e812975 100644 --- a/apps/emqx/src/emqx_batch.erl +++ b/apps/emqx/src/emqx_batch.erl @@ -85,7 +85,7 @@ commit(Batch = #batch{batch_q = Q, commit_fun = Commit}) -> reset(Batch). reset(Batch = #batch{linger_timer = TRef}) -> - _ = emqx_misc:cancel_timer(TRef), + _ = emqx_utils:cancel_timer(TRef), Batch#batch{batch_q = [], linger_timer = undefined}. -spec size(batch()) -> non_neg_integer(). diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index d56620123..7e00c050e 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -92,7 +92,7 @@ start_link(Pool, Id) -> ok = create_tabs(), gen_server:start_link( - {local, emqx_misc:proc_name(?BROKER, Id)}, + {local, emqx_utils:proc_name(?BROKER, Id)}, ?MODULE, [Pool, Id], [] diff --git a/apps/emqx/src/emqx_broker_helper.erl b/apps/emqx/src/emqx_broker_helper.erl index 91b4c4994..87b291361 100644 --- a/apps/emqx/src/emqx_broker_helper.erl +++ b/apps/emqx/src/emqx_broker_helper.erl @@ -131,7 +131,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) -> - SubPids = [SubPid | emqx_misc:drain_down(?BATCH_SIZE)], + SubPids = [SubPid | emqx_utils:drain_down(?BATCH_SIZE)], ok = emqx_pool:async_submit( fun lists:foreach/2, [fun clean_down/1, SubPids] ), diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 29a59e482..745429536 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -61,7 +61,7 @@ -export([set_field/3]). -import( - emqx_misc, + emqx_utils, [ run_fold/3, pipeline/3, @@ -622,7 +622,7 @@ process_connect( NChannel = Channel#channel{session = Session}, handle_out(connack, {?RC_SUCCESS, sp(false), AckProps}, ensure_connected(NChannel)); {ok, #{session := Session, present := true, pendings := Pendings}} -> - Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), + Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())), NChannel = Channel#channel{ session = Session, resuming = true, @@ -1203,7 +1203,7 @@ handle_call( ) -> ok = emqx_session:takeover(Session), %% TODO: Should not drain deliver here (side effect) - Delivers = emqx_misc:drain_deliver(), + Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), disconnect_and_shutdown(takenover, AllPendings, Channel); handle_call(list_authz_cache, Channel) -> @@ -1402,7 +1402,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> @@ -2045,7 +2045,7 @@ clear_keepalive(Channel = #channel{timers = Timers}) -> undefined -> Channel; TRef -> - emqx_misc:cancel_timer(TRef), + emqx_utils:cancel_timer(TRef), Channel#channel{timers = maps:without([alive_timer], Timers)} end. %%-------------------------------------------------------------------- @@ -2241,7 +2241,7 @@ get_mqtt_conf(Zone, Key, Default) -> %%-------------------------------------------------------------------- set_field(Name, Value, Channel) -> - Pos = emqx_misc:index_of(Name, record_info(fields, channel)), + Pos = emqx_utils:index_of(Name, record_info(fields, channel)), setelement(Pos + 1, Channel, Value). get_mqueue(#channel{session = Session}) -> diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index f8c510482..346e8dcb5 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -672,7 +672,7 @@ handle_cast(Msg, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{chan_pmon := PMon}) -> ?tp(emqx_cm_process_down, #{stale_pid => Pid, reason => _Reason}), - ChanPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)], + ChanPids = [Pid | emqx_utils:drain_down(?BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), lists:foreach(fun mark_channel_disconnected/1, ChanPids), ok = emqx_pool:async_submit(fun lists:foreach/2, [fun ?MODULE:clean_down/1, Items]), diff --git a/apps/emqx/src/emqx_connection.erl b/apps/emqx/src/emqx_connection.erl index e5002cab4..8d47f033c 100644 --- a/apps/emqx/src/emqx_connection.erl +++ b/apps/emqx/src/emqx_connection.erl @@ -77,7 +77,7 @@ -export([set_field/3]). -import( - emqx_misc, + emqx_utils, [start_timer/2] ). @@ -260,7 +260,7 @@ stats(#state{ {error, _} -> [] end, ChanStats = emqx_channel:stats(Channel), - ProcStats = emqx_misc:proc_stats(), + ProcStats = emqx_utils:proc_stats(), lists:append([SockStats, ChanStats, ProcStats]). %% @doc Set TCP keepalive socket options to override system defaults. @@ -392,7 +392,7 @@ run_loop( emqx_channel:info(zone, Channel), [force_shutdown] ), - emqx_misc:tune_heap_size(ShutdownPolicy), + emqx_utils:tune_heap_size(ShutdownPolicy), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); @@ -472,7 +472,7 @@ ensure_stats_timer(_Timeout, State) -> -compile({inline, [cancel_stats_timer/1]}). cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -> ?tp(debug, cancel_stats_timer, #{}), - ok = emqx_misc:cancel_timer(TRef), + ok = emqx_utils:cancel_timer(TRef), State#state{stats_timer = undefined}; cancel_stats_timer(State) -> State. @@ -558,7 +558,7 @@ handle_msg( {incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State = #state{idle_timer = IdleTimer} ) -> - ok = emqx_misc:cancel_timer(IdleTimer), + ok = emqx_utils:cancel_timer(IdleTimer), Serialize = emqx_frame:serialize_opts(ConnPkt), NState = State#state{ serialize = Serialize, @@ -593,7 +593,7 @@ handle_msg( #state{listener = {Type, Listener}} = State ) -> ActiveN = get_active_n(Type, Listener), - Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent handle_msg({inet_reply, _Sock, ok}, State = #state{listener = {Type, Listener}}) -> @@ -1073,7 +1073,7 @@ check_oom(State = #state{channel = Channel}) -> emqx_channel:info(zone, Channel), [force_shutdown] ), ?tp(debug, check_oom, #{policy => ShutdownPolicy}), - case emqx_misc:check_oom(ShutdownPolicy) of + case emqx_utils:check_oom(ShutdownPolicy) of {shutdown, Reason} -> %% triggers terminate/2 callback immediately erlang:exit({shutdown, Reason}); @@ -1200,7 +1200,7 @@ inc_counter(Key, Inc) -> %%-------------------------------------------------------------------- set_field(Name, Value, State) -> - Pos = emqx_misc:index_of(Name, record_info(fields, state)), + Pos = emqx_utils:index_of(Name, record_info(fields, state)), setelement(Pos + 1, State, Value). get_state(Pid) -> diff --git a/apps/emqx/src/emqx_crl_cache.erl b/apps/emqx/src/emqx_crl_cache.erl index 79e47a6dc..084313420 100644 --- a/apps/emqx/src/emqx_crl_cache.erl +++ b/apps/emqx/src/emqx_crl_cache.erl @@ -117,7 +117,7 @@ handle_call(Call, _From, State) -> handle_cast({evict, URL}, State0 = #state{refresh_timers = RefreshTimers0}) -> emqx_ssl_crl_cache:delete(URL), MTimer = maps:get(URL, RefreshTimers0, undefined), - emqx_misc:cancel_timer(MTimer), + emqx_utils:cancel_timer(MTimer), RefreshTimers = maps:without([URL], RefreshTimers0), State = State0#state{refresh_timers = RefreshTimers}, ?tp( @@ -223,9 +223,9 @@ ensure_timer(URL, State = #state{refresh_interval = Timeout}) -> ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) -> ?tp(crl_cache_ensure_timer, #{url => URL, timeout => Timeout}), MTimer = maps:get(URL, RefreshTimers0, undefined), - emqx_misc:cancel_timer(MTimer), + emqx_utils:cancel_timer(MTimer), RefreshTimers = RefreshTimers0#{ - URL => emqx_misc:start_timer( + URL => emqx_utils:start_timer( Timeout, {refresh, URL} ) @@ -297,7 +297,7 @@ handle_cache_overflow(State0) -> {_Time, OldestURL, InsertionTimes} = gb_trees:take_smallest(InsertionTimes0), emqx_ssl_crl_cache:delete(OldestURL), MTimer = maps:get(OldestURL, RefreshTimers0, undefined), - emqx_misc:cancel_timer(MTimer), + emqx_utils:cancel_timer(MTimer), RefreshTimers = maps:remove(OldestURL, RefreshTimers0), CachedURLs = sets:del_element(OldestURL, CachedURLs0), ?tp(debug, crl_cache_overflow, #{oldest_url => OldestURL}), diff --git a/apps/emqx/src/emqx_flapping.erl b/apps/emqx/src/emqx_flapping.erl index 64e4ed6c3..2cfed0a8e 100644 --- a/apps/emqx/src/emqx_flapping.erl +++ b/apps/emqx/src/emqx_flapping.erl @@ -184,7 +184,7 @@ code_change(_OldVsn, State, _Extra) -> start_timer(Zone) -> WindTime = maps:get(window_time, get_policy(Zone)), - emqx_misc:start_timer(WindTime, {garbage_collect, Zone}). + emqx_utils:start_timer(WindTime, {garbage_collect, Zone}). start_timers() -> lists:foreach( diff --git a/apps/emqx/src/emqx_guid.erl b/apps/emqx/src/emqx_guid.erl index fea4e70b0..d313723fb 100644 --- a/apps/emqx/src/emqx_guid.erl +++ b/apps/emqx/src/emqx_guid.erl @@ -145,10 +145,10 @@ npid() -> NPid. to_hexstr(I) when byte_size(I) =:= 16 -> - emqx_misc:bin_to_hexstr(I, upper). + emqx_utils:bin_to_hexstr(I, upper). from_hexstr(S) when byte_size(S) =:= 32 -> - emqx_misc:hexstr_to_bin(S). + emqx_utils:hexstr_to_bin(S). to_base62(<>) -> emqx_base62:encode(I). diff --git a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl index 83bc2ec72..bbebd9460 100644 --- a/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl +++ b/apps/emqx/src/emqx_limiter/src/emqx_htb_limiter.erl @@ -375,7 +375,7 @@ return_pause(infinity, PauseType, Fun, Diff, Limiter) -> {PauseType, ?MINIMUM_PAUSE, make_retry_context(Fun, Diff), Limiter}; return_pause(Rate, PauseType, Fun, Diff, Limiter) -> Val = erlang:round(Diff * emqx_limiter_schema:default_period() / Rate), - Pause = emqx_misc:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), + Pause = emqx_utils:clamp(Val, ?MINIMUM_PAUSE, ?MAXIMUM_PAUSE), {PauseType, Pause, make_retry_context(Fun, Diff), Limiter}. -spec make_retry_context(undefined | retry_fun(Limiter), non_neg_integer()) -> diff --git a/apps/emqx/src/emqx_listeners.erl b/apps/emqx/src/emqx_listeners.erl index 4e5843166..96e28b1de 100644 --- a/apps/emqx/src/emqx_listeners.erl +++ b/apps/emqx/src/emqx_listeners.erl @@ -538,7 +538,7 @@ esockd_access_rules(StrRules) -> [A, CIDR] = string:tokens(S, " "), %% esockd rules only use words 'allow' and 'deny', both are existing %% comparison of strings may be better, but there is a loss of backward compatibility - case emqx_misc:safe_to_existing_atom(A) of + case emqx_utils:safe_to_existing_atom(A) of {ok, Action} -> [ { @@ -560,7 +560,7 @@ esockd_access_rules(StrRules) -> merge_default(Options) -> case lists:keytake(tcp_options, 1, Options) of {value, {tcp_options, TcpOpts}, Options1} -> - [{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1]; + [{tcp_options, emqx_utils:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1]; false -> [{tcp_options, ?MQTT_SOCKOPTS} | Options] end. diff --git a/apps/emqx/src/emqx_ocsp_cache.erl b/apps/emqx/src/emqx_ocsp_cache.erl index 4e7ada044..7044d992e 100644 --- a/apps/emqx/src/emqx_ocsp_cache.erl +++ b/apps/emqx/src/emqx_ocsp_cache.erl @@ -476,9 +476,9 @@ ensure_timer(ListenerID, State, Timeout) -> ensure_timer(ListenerID, {refresh, ListenerID}, State, Timeout). ensure_timer(ListenerID, Message, State, Timeout) -> - emqx_misc:cancel_timer(maps:get(?REFRESH_TIMER(ListenerID), State, undefined)), + emqx_utils:cancel_timer(maps:get(?REFRESH_TIMER(ListenerID), State, undefined)), State#{ - ?REFRESH_TIMER(ListenerID) => emqx_misc:start_timer( + ?REFRESH_TIMER(ListenerID) => emqx_utils:start_timer( Timeout, Message ) diff --git a/apps/emqx/src/emqx_os_mon.erl b/apps/emqx/src/emqx_os_mon.erl index c5ce35bf9..4810798eb 100644 --- a/apps/emqx/src/emqx_os_mon.erl +++ b/apps/emqx/src/emqx_os_mon.erl @@ -180,8 +180,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- cancel_outdated_timer(#{mem_time_ref := MemRef, cpu_time_ref := CpuRef}) -> - emqx_misc:cancel_timer(MemRef), - emqx_misc:cancel_timer(CpuRef), + emqx_utils:cancel_timer(MemRef), + emqx_utils:cancel_timer(CpuRef), ok. start_cpu_check_timer() -> @@ -204,7 +204,7 @@ start_mem_check_timer() -> end. start_timer(Interval, Msg) -> - emqx_misc:start_timer(Interval, Msg). + emqx_utils:start_timer(Interval, Msg). update_mem_alarm_status(HWM) when HWM > 1.0 orelse HWM < 0.0 -> ?SLOG(warning, #{msg => "discarded_out_of_range_mem_alarm_threshold", value => HWM}), diff --git a/apps/emqx/src/emqx_pool.erl b/apps/emqx/src/emqx_pool.erl index 1691a533a..1cb5f429c 100644 --- a/apps/emqx/src/emqx_pool.erl +++ b/apps/emqx/src/emqx_pool.erl @@ -57,7 +57,7 @@ -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_misc:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 7c9cc61b0..42430af5d 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -98,7 +98,7 @@ mnesia(boot) -> -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_misc:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 8c91ae782..96d9aea34 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2327,7 +2327,7 @@ mqtt_ssl_listener_ssl_options_validator(Conf) -> fun ocsp_outer_validator/1, fun crl_outer_validator/1 ], - case emqx_misc:pipeline(Checks, Conf, not_used) of + case emqx_utils:pipeline(Checks, Conf, not_used) of {ok, _, _} -> ok; {error, Reason, _NotUsed} -> diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index a13dfe491..8b15340e9 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -941,7 +941,7 @@ age(Now, Ts) -> Now - Ts. %%-------------------------------------------------------------------- set_field(Name, Value, Session) -> - Pos = emqx_misc:index_of(Name, record_info(fields, session)), + Pos = emqx_utils:index_of(Name, record_info(fields, session)), setelement(Pos + 1, Session, Value). get_mqueue(#session{mqueue = Q}) -> diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl index 0d4972e8c..8a21d4d03 100644 --- a/apps/emqx/src/emqx_session_router.erl +++ b/apps/emqx/src/emqx_session_router.erl @@ -104,7 +104,7 @@ create_init_tab() -> -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_misc:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] diff --git a/apps/emqx/src/emqx_stats.erl b/apps/emqx/src/emqx_stats.erl index ed901d9a9..017b83116 100644 --- a/apps/emqx/src/emqx_stats.erl +++ b/apps/emqx/src/emqx_stats.erl @@ -213,7 +213,7 @@ init(#{tick_ms := TickMs}) -> {ok, start_timer(#state{updates = [], tick_ms = TickMs}), hibernate}. start_timer(#state{tick_ms = Ms} = State) -> - State#state{timer = emqx_misc:start_timer(Ms, tick)}. + State#state{timer = emqx_utils:start_timer(Ms, tick)}. handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -301,7 +301,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> - emqx_misc:cancel_timer(TRef). + emqx_utils:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/apps/emqx/src/emqx_sys.erl b/apps/emqx/src/emqx_sys.erl index af2b052aa..509429796 100644 --- a/apps/emqx/src/emqx_sys.erl +++ b/apps/emqx/src/emqx_sys.erl @@ -62,7 +62,7 @@ -endif. -import(emqx_topic, [systop/1]). --import(emqx_misc, [start_timer/2]). +-import(emqx_utils, [start_timer/2]). -record(state, { heartbeat :: maybe(reference()), @@ -222,7 +222,7 @@ handle_info(Info, State) -> terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) -> _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH), unload_event_hooks(sys_event_messages()), - lists:foreach(fun emqx_misc:cancel_timer/1, [TRef1, TRef2]). + lists:foreach(fun emqx_utils:cancel_timer/1, [TRef1, TRef2]). unload_event_hooks([]) -> ok; diff --git a/apps/emqx/src/emqx_sys_mon.erl b/apps/emqx/src/emqx_sys_mon.erl index 6ff68820e..f1190f586 100644 --- a/apps/emqx/src/emqx_sys_mon.erl +++ b/apps/emqx/src/emqx_sys_mon.erl @@ -77,7 +77,7 @@ init([]) -> {ok, start_timer(#{timer => undefined, events => []})}. start_timer(State) -> - State#{timer := emqx_misc:start_timer(timer:seconds(2), reset)}. + State#{timer := emqx_utils:start_timer(timer:seconds(2), reset)}. sysm_opts(VM) -> sysm_opts(maps:to_list(VM), []). @@ -204,7 +204,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef), + emqx_utils:cancel_timer(TRef), ok. code_change(_OldVsn, State, _Extra) -> diff --git a/apps/emqx/src/emqx_trace/emqx_trace.erl b/apps/emqx/src/emqx_trace/emqx_trace.erl index f14dc0c15..91194772f 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace.erl @@ -272,7 +272,7 @@ handle_info({timeout, TRef, update_trace}, #{timer := TRef} = State) -> ?tp(update_trace_done, #{}), {noreply, State#{timer => NextTRef}}; handle_info({mnesia_table_event, _Events}, State = #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef), + emqx_utils:cancel_timer(TRef), handle_info({timeout, TRef, update_trace}, State); handle_info(Info, State) -> ?SLOG(error, #{unexpected_info => Info}), @@ -280,7 +280,7 @@ handle_info(Info, State) -> terminate(_Reason, #{timer := TRef}) -> _ = mnesia:unsubscribe({table, ?TRACE, simple}), - emqx_misc:cancel_timer(TRef), + emqx_utils:cancel_timer(TRef), stop_all_trace_handler(), update_trace_handler(), _ = file:del_dir_r(zip_dir()), @@ -302,7 +302,7 @@ update_trace(Traces) -> ok = stop_trace(NeedStop, Started), clean_stale_trace_files(), NextTime = find_closest_time(Traces, Now), - emqx_misc:start_timer(NextTime, update_trace). + emqx_utils:start_timer(NextTime, update_trace). stop_all_trace_handler() -> lists:foreach( diff --git a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl index 231fd5e7b..528bc4d42 100644 --- a/apps/emqx/src/emqx_trace/emqx_trace_handler.erl +++ b/apps/emqx/src/emqx_trace/emqx_trace_handler.erl @@ -196,7 +196,7 @@ handler_id(Name, Type) -> do_handler_id(Name, Type) catch _:_ -> - Hash = emqx_misc:bin_to_hexstr(crypto:hash(md5, Name), lower), + Hash = emqx_utils:bin_to_hexstr(crypto:hash(md5, Name), lower), do_handler_id(Hash, Type) end. diff --git a/apps/emqx/src/emqx_vm_mon.erl b/apps/emqx/src/emqx_vm_mon.erl index 1327a1bb0..d90d4139b 100644 --- a/apps/emqx/src/emqx_vm_mon.erl +++ b/apps/emqx/src/emqx_vm_mon.erl @@ -107,7 +107,7 @@ code_change(_OldVsn, State, _Extra) -> start_check_timer() -> Interval = emqx:get_config([sysmon, vm, process_check_interval]), - emqx_misc:start_timer(Interval, check). + emqx_utils:start_timer(Interval, check). usage(Percent) -> integer_to_list(floor(Percent * 100)) ++ "%". diff --git a/apps/emqx/src/emqx_ws_connection.erl b/apps/emqx/src/emqx_ws_connection.erl index ead609ed8..20962809f 100644 --- a/apps/emqx/src/emqx_ws_connection.erl +++ b/apps/emqx/src/emqx_ws_connection.erl @@ -52,7 +52,7 @@ -export([set_field/3]). -import( - emqx_misc, + emqx_utils, [ maybe_apply/2, start_timer/2 @@ -172,7 +172,7 @@ stats(WsPid) when is_pid(WsPid) -> stats(#state{channel = Channel}) -> SockStats = emqx_pd:get_counters(?SOCK_STATS), ChanStats = emqx_channel:stats(Channel), - ProcStats = emqx_misc:proc_stats(), + ProcStats = emqx_utils:proc_stats(), lists:append([SockStats, ChanStats, ProcStats]). %% kick|discard|takeover @@ -340,7 +340,7 @@ tune_heap_size(Channel) -> ) of #{enable := false} -> ok; - ShutdownPolicy -> emqx_misc:tune_heap_size(ShutdownPolicy) + ShutdownPolicy -> emqx_utils:tune_heap_size(ShutdownPolicy) end. get_stats_enable(Zone) -> @@ -454,7 +454,7 @@ websocket_info( State = #state{listener = {Type, Listener}} ) -> ActiveN = get_active_n(Type, Listener), - Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); websocket_info( {timeout, _, limit_timeout}, @@ -678,7 +678,7 @@ check_oom(State = #state{channel = Channel}) -> #{enable := false} -> State; #{enable := true} -> - case emqx_misc:check_oom(ShutdownPolicy) of + case emqx_utils:check_oom(ShutdownPolicy) of Shutdown = {shutdown, _Reason} -> postpone(Shutdown, State); _Other -> @@ -913,7 +913,7 @@ inc_qos_stats_key(_, _) -> undefined. %% Cancel idle timer cancel_idle_timer(State = #state{idle_timer = IdleTimer}) -> - ok = emqx_misc:cancel_timer(IdleTimer), + ok = emqx_utils:cancel_timer(IdleTimer), State#state{idle_timer = undefined}. %%-------------------------------------------------------------------- @@ -1046,7 +1046,7 @@ check_max_connection(Type, Listener) -> %%-------------------------------------------------------------------- set_field(Name, Value, State) -> - Pos = emqx_misc:index_of(Name, record_info(fields, state)), + Pos = emqx_utils:index_of(Name, record_info(fields, state)), setelement(Pos + 1, State, Value). %% ensure lowercase letters in headers diff --git a/apps/emqx/test/emqx_connection_SUITE.erl b/apps/emqx/test/emqx_connection_SUITE.erl index cc9e03168..21ed45119 100644 --- a/apps/emqx/test/emqx_connection_SUITE.erl +++ b/apps/emqx/test/emqx_connection_SUITE.erl @@ -496,16 +496,16 @@ t_get_conn_info(_) -> t_oom_shutdown(init, Config) -> ok = snabbkaffe:start_trace(), - ok = meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]), + ok = meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]), meck:expect( - emqx_misc, + emqx_utils, check_oom, fun(_) -> {shutdown, "fake_oom"} end ), Config; t_oom_shutdown('end', _Config) -> snabbkaffe:stop(), - meck:unload(emqx_misc), + meck:unload(emqx_utils), ok. t_oom_shutdown(_) -> diff --git a/apps/emqx/test/emqx_misc_SUITE.erl b/apps/emqx/test/emqx_misc_SUITE.erl deleted file mode 100644 index c068a087e..000000000 --- a/apps/emqx/test/emqx_misc_SUITE.erl +++ /dev/null @@ -1,209 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_misc_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include_lib("eunit/include/eunit.hrl"). - --define(SOCKOPTS, [ - binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true} -]). - -all() -> emqx_common_test_helpers:all(?MODULE). - -t_merge_opts(_) -> - Opts = emqx_misc:merge_opts(?SOCKOPTS, [ - raw, - binary, - {backlog, 1024}, - {nodelay, false}, - {max_clients, 1024}, - {acceptors, 16} - ]), - ?assertEqual(1024, proplists:get_value(backlog, Opts)), - ?assertEqual(1024, proplists:get_value(max_clients, Opts)), - ?assertEqual( - [ - binary, - raw, - {acceptors, 16}, - {backlog, 1024}, - {max_clients, 1024}, - {nodelay, false}, - {packet, raw}, - {reuseaddr, true} - ], - lists:sort(Opts) - ). - -t_maybe_apply(_) -> - ?assertEqual(undefined, emqx_misc:maybe_apply(fun(A) -> A end, undefined)), - ?assertEqual(a, emqx_misc:maybe_apply(fun(A) -> A end, a)). - -t_run_fold(_) -> - ?assertEqual(1, emqx_misc:run_fold([], 1, state)), - Add = fun(I, St) -> I + St end, - Mul = fun(I, St) -> I * St end, - ?assertEqual(6, emqx_misc:run_fold([Add, Mul], 1, 2)). - -t_pipeline(_) -> - ?assertEqual({ok, input, state}, emqx_misc:pipeline([], input, state)), - Funs = [ - fun(_I, _St) -> ok end, - fun(_I, St) -> {ok, St + 1} end, - fun(I, St) -> {ok, I + 1, St + 1} end, - fun(I, St) -> {ok, I * 2, St * 2} end - ], - ?assertEqual({ok, 4, 6}, emqx_misc:pipeline(Funs, 1, 1)), - ?assertEqual( - {error, undefined, 1}, emqx_misc:pipeline([fun(_I) -> {error, undefined} end], 1, 1) - ), - ?assertEqual( - {error, undefined, 2}, emqx_misc:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1) - ). - -t_start_timer(_) -> - TRef = emqx_misc:start_timer(1, tmsg), - timer:sleep(2), - ?assertEqual([{timeout, TRef, tmsg}], drain()), - ok = emqx_misc:cancel_timer(TRef). - -t_cancel_timer(_) -> - Timer = emqx_misc:start_timer(0, foo), - ok = emqx_misc:cancel_timer(Timer), - ?assertEqual([], drain()), - ok = emqx_misc:cancel_timer(undefined). - -t_proc_name(_) -> - ?assertEqual(emqx_pool_1, emqx_misc:proc_name(emqx_pool, 1)). - -t_proc_stats(_) -> - Pid1 = spawn(fun() -> exit(normal) end), - timer:sleep(10), - ?assertEqual([], emqx_misc:proc_stats(Pid1)), - Pid2 = spawn(fun() -> - ?assertMatch([{mailbox_len, 0} | _], emqx_misc:proc_stats()), - timer:sleep(200) - end), - timer:sleep(10), - Pid2 ! msg, - timer:sleep(10), - ?assertMatch([{mailbox_len, 1} | _], emqx_misc:proc_stats(Pid2)). - -t_drain_deliver(_) -> - self() ! {deliver, t1, m1}, - self() ! {deliver, t2, m2}, - ?assertEqual( - [ - {deliver, t1, m1}, - {deliver, t2, m2} - ], - emqx_misc:drain_deliver(2) - ). - -t_drain_down(_) -> - {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end), - {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end), - timer:sleep(100), - ?assertEqual([Pid1, Pid2], lists:sort(emqx_misc:drain_down(2))), - ?assertEqual([], emqx_misc:drain_down(1)). - -t_index_of(_) -> - try emqx_misc:index_of(a, []) of - _ -> ct:fail(should_throw_error) - catch - error:Reason -> - ?assertEqual(badarg, Reason) - end, - ?assertEqual(3, emqx_misc:index_of(a, [b, c, a, e, f])). - -t_check(_) -> - Policy = #{ - max_message_queue_len => 10, - max_heap_size => 1024 * 1024 * 8, - enable => true - }, - [self() ! {msg, I} || I <- lists:seq(1, 5)], - ?assertEqual(ok, emqx_misc:check_oom(Policy)), - [self() ! {msg, I} || I <- lists:seq(1, 6)], - ?assertEqual( - {shutdown, #{reason => message_queue_too_long, value => 11, max => 10}}, - emqx_misc:check_oom(Policy) - ). - -drain() -> - drain([]). - -drain(Acc) -> - receive - Msg -> drain([Msg | Acc]) - after 0 -> - lists:reverse(Acc) - end. - -t_rand_seed(_) -> - ?assert(is_tuple(emqx_misc:rand_seed())). - -t_now_to_secs(_) -> - ?assert(is_integer(emqx_misc:now_to_secs(os:timestamp()))). - -t_now_to_ms(_) -> - ?assert(is_integer(emqx_misc:now_to_ms(os:timestamp()))). - -t_gen_id(_) -> - ?assertEqual(10, length(emqx_misc:gen_id(10))), - ?assertEqual(20, length(emqx_misc:gen_id(20))). - -t_pmap_normal(_) -> - ?assertEqual( - [5, 7, 9], - emqx_misc:pmap( - fun({A, B}) -> A + B end, - [{2, 3}, {3, 4}, {4, 5}] - ) - ). - -t_pmap_timeout(_) -> - ?assertExit( - timeout, - emqx_misc:pmap( - fun - (timeout) -> ct:sleep(1000); - ({A, B}) -> A + B - end, - [{2, 3}, {3, 4}, timeout], - 100 - ) - ). - -t_pmap_exception(_) -> - ?assertError( - foobar, - emqx_misc:pmap( - fun - (error) -> error(foobar); - ({A, B}) -> A + B - end, - [{2, 3}, {3, 4}, error] - ) - ). diff --git a/apps/emqx/test/emqx_router_SUITE.erl b/apps/emqx/test/emqx_router_SUITE.erl index 298a33fe8..2db0acf82 100644 --- a/apps/emqx/test/emqx_router_SUITE.erl +++ b/apps/emqx/test/emqx_router_SUITE.erl @@ -119,7 +119,7 @@ t_has_routes(_) -> ?R:delete_route(<<"devices/+/messages">>). t_unexpected(_) -> - Router = emqx_misc:proc_name(?R, 1), + Router = emqx_utils:proc_name(?R, 1), ?assertEqual(ignored, gen_server:call(Router, bad_request)), ?assertEqual(ok, gen_server:cast(Router, bad_message)), Router ! bad_info. diff --git a/apps/emqx_authz/src/emqx_authz_file.erl b/apps/emqx_authz/src/emqx_authz_file.erl index 9aa2d506f..ede4a9582 100644 --- a/apps/emqx_authz/src/emqx_authz_file.erl +++ b/apps/emqx_authz/src/emqx_authz_file.erl @@ -47,7 +47,7 @@ create(#{path := Path} = Source) -> ?SLOG(alert, #{ msg => failed_to_read_acl_file, path => Path, - explain => emqx_misc:explain_posix(Reason) + explain => emqx_utils:explain_posix(Reason) }), throw(failed_to_read_acl_file); {error, Reason} -> diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index b717775a6..7a7dbb7e9 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -148,8 +148,8 @@ set_special_configs(_App) -> ok. init_per_testcase(t_api, Config) -> - meck:new(emqx_misc, [non_strict, passthrough, no_history, no_link]), - meck:expect(emqx_misc, gen_id, fun() -> "fake" end), + meck:new(emqx_utils, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_utils, gen_id, fun() -> "fake" end), meck:new(emqx, [non_strict, passthrough, no_history, no_link]), meck:expect( @@ -165,7 +165,7 @@ init_per_testcase(_, Config) -> Config. end_per_testcase(t_api, _Config) -> - meck:unload(emqx_misc), + meck:unload(emqx_utils), meck:unload(emqx), ok; end_per_testcase(_, _Config) -> diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 2a20c5994..087bc6a3f 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -296,7 +296,7 @@ create(BridgeType, BridgeName, RawConf) -> brige_action => create, bridge_type => BridgeType, bridge_name => BridgeName, - bridge_raw_config => emqx_misc:redact(RawConf) + bridge_raw_config => emqx_utils:redact(RawConf) }), emqx_conf:update( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 675541311..814fcdeb7 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -668,7 +668,7 @@ get_metrics_from_local_node(BridgeType, BridgeName) -> false -> ?BRIDGE_NOT_ENABLED; true -> - case emqx_misc:safe_to_existing_atom(Node, utf8) of + case emqx_utils:safe_to_existing_atom(Node, utf8) of {ok, TargetNode} -> call_operation(TargetNode, OperFunc, [ TargetNode, BridgeType, BridgeName @@ -835,7 +835,7 @@ format_resource_data(ResData) -> format_resource_data(error, undefined, Result) -> Result; format_resource_data(error, Error, Result) -> - Result#{status_reason => emqx_misc:readable_error_msg(Error)}; + Result#{status_reason => emqx_utils:readable_error_msg(Error)}; format_resource_data(K, V, Result) -> Result#{K => V}. @@ -1004,7 +1004,7 @@ supported_versions(get_metrics_from_all_nodes) -> [4]; supported_versions(_Call) -> [1, 2, 3, 4]. redact(Term) -> - emqx_misc:redact(Term). + emqx_utils:redact(Term). deobfuscate(NewConf, OldConf) -> maps:fold( @@ -1015,7 +1015,7 @@ deobfuscate(NewConf, OldConf) -> {ok, OldV} when is_map(V), is_map(OldV) -> Acc#{K => deobfuscate(V, OldV)}; {ok, OldV} -> - case emqx_misc:is_redacted(K, V) of + case emqx_utils:is_redacted(K, V) of true -> Acc#{K => OldV}; _ -> diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index b43cbe0ec..903e86443 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -157,7 +157,7 @@ create(Type, Name, Conf, Opts0) -> msg => "create bridge", type => Type, name => Name, - config => emqx_misc:redact(Conf) + config => emqx_utils:redact(Conf) }), Opts = override_start_after_created(Conf, Opts0), {ok, _Data} = emqx_resource:create_local( @@ -192,7 +192,7 @@ update(Type, Name, {OldConf, Conf}, Opts0) -> msg => "update bridge", type => Type, name => Name, - config => emqx_misc:redact(Conf) + config => emqx_utils:redact(Conf) }), case recreate(Type, Name, Conf, Opts) of {ok, _} -> @@ -202,7 +202,7 @@ update(Type, Name, {OldConf, Conf}, Opts0) -> msg => "updating_a_non_existing_bridge", type => Type, name => Name, - config => emqx_misc:redact(Conf) + config => emqx_utils:redact(Conf) }), create(Type, Name, Conf, Opts); {error, Reason} -> @@ -236,8 +236,8 @@ recreate(Type, Name, Conf, Opts) -> ). create_dry_run(Type, Conf0) -> - TmpPath0 = iolist_to_binary([?TEST_ID_PREFIX, emqx_misc:gen_id(8)]), - TmpPath = emqx_misc:safe_filename(TmpPath0), + TmpPath0 = iolist_to_binary([?TEST_ID_PREFIX, emqx_utils:gen_id(8)]), + TmpPath = emqx_utils:safe_filename(TmpPath0), Conf = emqx_map_lib:safe_atom_key_map(Conf0), case emqx_connector_ssl:convert_certs(TmpPath, Conf) of {error, Reason} -> diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index 3ec6061d8..ef997f7e3 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -975,7 +975,7 @@ t_with_redact_update(Config) -> ), %% update with redacted config - BridgeConf = emqx_misc:redact(Template), + BridgeConf = emqx_utils:redact(Template), BridgeID = emqx_bridge_resource:bridge_id(Type, Name), {ok, 200, _} = request(put, uri(["bridges", BridgeID]), BridgeConf, Config), ?assertEqual( diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index 2dc43a130..fdfa3300c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -156,7 +156,7 @@ on_start(InstanceId, Config) -> msg => "failed_to_start_kafka_consumer_client", instance_id => InstanceId, kafka_hosts => BootstrapHosts, - reason => emqx_misc:redact(Reason) + reason => emqx_utils:redact(Reason) }), throw(?CLIENT_DOWN_MESSAGE) end, @@ -344,7 +344,7 @@ start_consumer(Config, InstanceId, ClientID) -> msg => "failed_to_start_kafka_consumer", instance_id => InstanceId, kafka_hosts => emqx_bridge_kafka_impl:hosts(BootstrapHosts0), - reason => emqx_misc:redact(Reason2) + reason => emqx_utils:redact(Reason2) }), stop_client(ClientID), throw(failed_to_start_kafka_consumer) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index fc9a6dc29..254284b75 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -299,7 +299,7 @@ init_per_testcase(TestCase, Config) when common_init_per_testcase(TestCase, Config) end; init_per_testcase(t_cluster_group = TestCase, Config0) -> - Config = emqx_misc:merge_opts(Config0, [{num_partitions, 6}]), + Config = emqx_utils:merge_opts(Config0, [{num_partitions, 6}]), common_init_per_testcase(TestCase, Config); init_per_testcase(t_multiple_topic_mappings = TestCase, Config0) -> KafkaTopicBase = @@ -1543,7 +1543,7 @@ do_t_receive_after_recovery(Config) -> %% 2) publish messages while the consumer is down. %% we use `pmap' to avoid wolff sending the whole %% batch to a single partition. - emqx_misc:pmap(fun(Msg) -> publish(Config, [Msg]) end, Messages1), + emqx_utils:pmap(fun(Msg) -> publish(Config, [Msg]) end, Messages1), ok end), %% 3) restore and consume messages @@ -1667,7 +1667,7 @@ t_cluster_group(Config) -> || {Name, Opts} <- Cluster ], on_exit(fun() -> - emqx_misc:pmap( + emqx_utils:pmap( fun(N) -> ct:pal("stopping ~p", [N]), ok = emqx_common_test_helpers:stop_slave(N) @@ -1889,7 +1889,7 @@ t_cluster_node_down(Config) -> Cluster ), on_exit(fun() -> - emqx_misc:pmap( + emqx_utils:pmap( fun(N) -> ct:pal("stopping ~p", [N]), ok = emqx_common_test_helpers:stop_slave(N) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 0382045d4..c82191bc3 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -501,15 +501,17 @@ log_and_alarm(IsSuccess, Res, #{kind := ?APPLY_KIND_INITIATE} = Meta) -> %% because nothing is committed case IsSuccess of true -> - ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => emqx_misc:redact(Res)}); + ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_result", result => emqx_utils:redact(Res)}); false -> - ?SLOG(warning, Meta#{msg => "cluster_rpc_apply_result", result => emqx_misc:redact(Res)}) + ?SLOG(warning, Meta#{ + msg => "cluster_rpc_apply_result", result => emqx_utils:redact(Res) + }) end; log_and_alarm(true, Res, Meta) -> - ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => emqx_misc:redact(Res)}), + ?SLOG(debug, Meta#{msg => "cluster_rpc_apply_ok", result => emqx_utils:redact(Res)}), do_alarm(deactivate, Res, Meta); log_and_alarm(false, Res, Meta) -> - ?SLOG(error, Meta#{msg => "cluster_rpc_apply_failed", result => emqx_misc:redact(Res)}), + ?SLOG(error, Meta#{msg => "cluster_rpc_apply_failed", result => emqx_utils:redact(Res)}), do_alarm(activate, Res, Meta). do_alarm(Fun, Res, #{tnx_id := Id} = Meta) -> diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl index bce866c2d..fe72cd65b 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_cleaner.erl @@ -73,7 +73,7 @@ handle_info(Info, State) -> {noreply, State}. terminate(_Reason, #{timer := TRef}) -> - emqx_misc:cancel_timer(TRef). + emqx_utils:cancel_timer(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -82,7 +82,7 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- ensure_timer(State = #{cleanup_ms := Ms}) -> - State#{timer := emqx_misc:start_timer(Ms, del_stale_mfa)}. + State#{timer := emqx_utils:start_timer(Ms, del_stale_mfa)}. %% @doc Keep the latest completed 100 records for querying and troubleshooting. del_stale_mfa(MaxHistory) -> diff --git a/apps/emqx_connector/src/emqx_connector_http.erl b/apps/emqx_connector/src/emqx_connector_http.erl index dfa6dab81..ef2e11eb7 100644 --- a/apps/emqx_connector/src/emqx_connector_http.erl +++ b/apps/emqx_connector/src/emqx_connector_http.erl @@ -219,7 +219,7 @@ on_start( SSLOpts = emqx_tls_lib:to_client_opts(maps:get(ssl, Config)), {tls, SSLOpts} end, - NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), + NTransportOpts = emqx_utils:ipv6_probe(TransportOpts), PoolOpts = [ {host, Host}, {port, Port}, @@ -425,7 +425,7 @@ do_get_status(PoolName, Timeout) -> Error end end, - try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of % we crash in case of non-empty lists since we don't know what to do in that case [_ | _] = Results -> case [E || {error, _} = E <- Results] of @@ -603,7 +603,7 @@ is_sensitive_key(_) -> %% Function that will do a deep traversal of Data and remove sensitive %% information (i.e., passwords) redact(Data) -> - emqx_misc:redact(Data, fun is_sensitive_key/1). + emqx_utils:redact(Data, fun is_sensitive_key/1). %% because the body may contain some sensitive data %% and at the same time the redact function will not scan the binary data diff --git a/apps/emqx_connector/src/emqx_connector_ldap.erl b/apps/emqx_connector/src/emqx_connector_ldap.erl index 82d622e09..ac2af301e 100644 --- a/apps/emqx_connector/src/emqx_connector_ldap.erl +++ b/apps/emqx_connector/src/emqx_connector_ldap.erl @@ -65,7 +65,7 @@ on_start( ?SLOG(info, #{ msg => "starting_ldap_connector", connector => InstId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), Servers = emqx_schema:parse_servers(Servers0, ?LDAP_HOST_OPTIONS), SslOpts = diff --git a/apps/emqx_connector/src/emqx_connector_mongo.erl b/apps/emqx_connector/src/emqx_connector_mongo.erl index ead3e2e49..a5873bcf6 100644 --- a/apps/emqx_connector/src/emqx_connector_mongo.erl +++ b/apps/emqx_connector/src/emqx_connector_mongo.erl @@ -162,7 +162,7 @@ on_start( rs -> "starting_mongodb_replica_set_connector"; sharded -> "starting_mongodb_sharded_connector" end, - ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_misc:redact(Config)}), + ?SLOG(info, #{msg => Msg, connector => InstId, config => emqx_utils:redact(Config)}), NConfig = #{hosts := Hosts} = maybe_resolve_srv_and_txt_records(Config), SslOpts = case maps:get(enable, SSL) of diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index 8f2d06517..5b488825b 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -149,7 +149,7 @@ on_start(InstanceId, Conf) -> ?SLOG(info, #{ msg => "starting_mqtt_connector", connector => InstanceId, - config => emqx_misc:redact(Conf) + config => emqx_utils:redact(Conf) }), BasicConf = basic_config(Conf), BridgeConf = BasicConf#{ diff --git a/apps/emqx_connector/src/emqx_connector_mysql.erl b/apps/emqx_connector/src/emqx_connector_mysql.erl index c69cf5ca0..6600a5f77 100644 --- a/apps/emqx_connector/src/emqx_connector_mysql.erl +++ b/apps/emqx_connector/src/emqx_connector_mysql.erl @@ -102,7 +102,7 @@ on_start( ?SLOG(info, #{ msg => "starting_mysql_connector", connector => InstId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), SslOpts = case maps:get(enable, SSL) of diff --git a/apps/emqx_connector/src/emqx_connector_pgsql.erl b/apps/emqx_connector/src/emqx_connector_pgsql.erl index 14cbbc80f..8796e00a5 100644 --- a/apps/emqx_connector/src/emqx_connector_pgsql.erl +++ b/apps/emqx_connector/src/emqx_connector_pgsql.erl @@ -95,7 +95,7 @@ on_start( ?SLOG(info, #{ msg => "starting_postgresql_connector", connector => InstId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), SslOpts = case maps:get(enable, SSL) of diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index c70e766af..4ef778e6b 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -123,7 +123,7 @@ on_start( ?SLOG(info, #{ msg => "starting_redis_connector", connector => InstId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), ConfKey = case Type of diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl index 67fc40efa..df1114483 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_msg.erl @@ -85,7 +85,7 @@ to_remote_msg(MapMsg, #{ qos = QoS, retain = Retain, topic = topic(Mountpoint, Topic), - props = emqx_misc:pub_props_to_packet(PubProps), + props = emqx_utils:pub_props_to_packet(PubProps), payload = Payload }; to_remote_msg(#message{topic = Topic} = Msg, #{mountpoint := Mountpoint}) -> @@ -112,7 +112,7 @@ to_broker_msg( Retain = replace_simple_var(RetainToken, MapMsg), PubProps = maps:get(pub_props, MapMsg, #{}), set_headers( - Props#{properties => emqx_misc:pub_props_to_packet(PubProps)}, + Props#{properties => emqx_utils:pub_props_to_packet(PubProps)}, emqx_message:set_flags( #{dup => Dup, retain => Retain}, emqx_message:make(bridge, QoS, topic(Mountpoint, Topic), Payload) diff --git a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl index 8e41a9f0f..5197a35df 100644 --- a/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl +++ b/apps/emqx_connector/src/mqtt/emqx_connector_mqtt_worker.erl @@ -124,7 +124,7 @@ start_link(Name, BridgeOpts) -> {error, Reason} = Error -> ?SLOG(error, #{ msg => "client_start_failed", - config => emqx_misc:redact(BridgeOpts), + config => emqx_utils:redact(BridgeOpts), reason => Reason }), Error diff --git a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl index eb7f6c741..22cf484ff 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_swagger.erl @@ -915,4 +915,4 @@ schema_converter(Options) -> maps:get(schema_converter, Options, fun hocon_schema_to_spec/2). hocon_error_msg(Reason) -> - emqx_misc:readable_error_msg(Reason). + emqx_utils:readable_error_msg(Reason). diff --git a/apps/emqx_exhook/src/emqx_exhook_api.erl b/apps/emqx_exhook/src/emqx_exhook_api.erl index aa5d1897f..9bfae9579 100644 --- a/apps/emqx_exhook/src/emqx_exhook_api.erl +++ b/apps/emqx_exhook/src/emqx_exhook_api.erl @@ -478,7 +478,7 @@ call_cluster(Fun) -> %%-------------------------------------------------------------------- %% Internal Funcs %%-------------------------------------------------------------------- -err_msg(Msg) -> emqx_misc:readable_error_msg(Msg). +err_msg(Msg) -> emqx_utils:readable_error_msg(Msg). get_raw_config() -> RawConfig = emqx:get_raw_config([exhook, servers], []), diff --git a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl index 7f37061ac..4145a92a7 100644 --- a/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl +++ b/apps/emqx_gateway/src/bhvrs/emqx_gateway_conn.erl @@ -173,7 +173,7 @@ stats(#state{ end, ConnStats = emqx_pd:get_counters(?CONN_STATS), ChanStats = ChannMod:stats(Channel), - ProcStats = emqx_misc:proc_stats(), + ProcStats = emqx_utils:proc_stats(), lists:append([SockStats, ConnStats, ChanStats, ProcStats]). call(Pid, Req) -> @@ -297,7 +297,7 @@ init_state(WrappedSock, Peername, Options, FrameMod, ChannMod) -> StatsTimer = emqx_gateway_utils:stats_timer(Options), IdleTimeout = emqx_gateway_utils:idle_timeout(Options), OomPolicy = emqx_gateway_utils:oom_policy(Options), - IdleTimer = emqx_misc:start_timer(IdleTimeout, idle_timeout), + IdleTimer = emqx_utils:start_timer(IdleTimeout, idle_timeout), #state{ socket = WrappedSock, peername = Peername, @@ -327,7 +327,7 @@ run_loop( } ) -> emqx_logger:set_metadata_peername(esockd:format(Peername)), - _ = emqx_misc:tune_heap_size(OomPolicy), + _ = emqx_utils:tune_heap_size(OomPolicy), case activate_socket(State) of {ok, NState} -> hibernate(Parent, NState); @@ -383,14 +383,14 @@ wakeup_from_hib(Parent, State) -> %% Ensure/cancel stats timer ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) -> - State#state{stats_timer = emqx_misc:start_timer(Timeout, emit_stats)}; + State#state{stats_timer = emqx_utils:start_timer(Timeout, emit_stats)}; ensure_stats_timer(_Timeout, State) -> State. cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) -> - ok = emqx_misc:cancel_timer(TRef), + ok = emqx_utils:cancel_timer(TRef), State#state{stats_timer = undefined}; cancel_stats_timer(State) -> State. @@ -471,7 +471,7 @@ handle_msg( State = #state{idle_timer = IdleTimer} ) -> IdleTimer /= undefined andalso - emqx_misc:cancel_timer(IdleTimer), + emqx_utils:cancel_timer(IdleTimer), NState = State#state{idle_timer = undefined}, handle_incoming(Packet, NState); handle_msg({outgoing, Data}, State) -> @@ -501,7 +501,7 @@ handle_msg( Deliver = {deliver, _Topic, _Msg}, State = #state{active_n = ActiveN} ) -> - Delivers = [Deliver | emqx_misc:drain_deliver(ActiveN)], + Delivers = [Deliver | emqx_utils:drain_deliver(ActiveN)], with_channel(handle_deliver, [Delivers], State); %% Something sent %% TODO: Who will deliver this message? @@ -904,7 +904,7 @@ handle_info(Info, State) -> %% msg => "reach_rate_limit", %% pause => Time %% }), -%% TRef = emqx_misc:start_timer(Time, limit_timeout), +%% TRef = emqx_utils:start_timer(Time, limit_timeout), %% State#state{ %% sockstate = blocked, %% limiter = Limiter1, @@ -928,7 +928,7 @@ run_gc(Stats, State = #state{gc_state = GcSt}) -> end. check_oom(State = #state{oom_policy = OomPolicy}) -> - case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of + case ?ENABLED(OomPolicy) andalso emqx_utils:check_oom(OomPolicy) of {shutdown, Reason} -> %% triggers terminate/2 callback immediately erlang:exit({shutdown, Reason}); diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index e64e918b4..cd387e3bb 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -115,7 +115,7 @@ clients(get, #{ fun ?MODULE:format_channel_info/2 ); Node0 -> - case emqx_misc:safe_to_existing_atom(Node0) of + case emqx_utils:safe_to_existing_atom(Node0) of {ok, Node1} -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( diff --git a/apps/emqx_gateway/src/emqx_gateway_cm.erl b/apps/emqx_gateway/src/emqx_gateway_cm.erl index 71ec4bf59..5de6da63f 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm.erl @@ -802,7 +802,7 @@ handle_info( {'DOWN', _MRef, process, Pid, _Reason}, State = #state{gwname = GwName, chan_pmon = PMon} ) -> - ChanPids = [Pid | emqx_misc:drain_down(?DEFAULT_BATCH_SIZE)], + ChanPids = [Pid | emqx_utils:drain_down(?DEFAULT_BATCH_SIZE)], {Items, PMon1} = emqx_pmon:erase_all(ChanPids, PMon), CmTabs = cmtabs(GwName), diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 5982334b4..9bb5821f5 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -495,7 +495,7 @@ reason2msg( reason2msg( {#{roots := [{gateway, _}]}, [_ | _]} = Error ) -> - Bin = emqx_misc:readable_error_msg(Error), + Bin = emqx_utils:readable_error_msg(Error), <<"Invalid configurations: ", Bin/binary>>; reason2msg(_) -> error. diff --git a/apps/emqx_gateway/src/emqx_gateway_utils.erl b/apps/emqx_gateway/src/emqx_gateway_utils.erl index 9d80de00e..07185ef42 100644 --- a/apps/emqx_gateway/src/emqx_gateway_utils.erl +++ b/apps/emqx_gateway/src/emqx_gateway_utils.erl @@ -223,7 +223,7 @@ merge_default(Udp, Options) -> case lists:keytake(Key, 1, Options) of {value, {Key, TcpOpts}, Options1} -> [ - {Key, emqx_misc:merge_opts(Default, TcpOpts)} + {Key, emqx_utils:merge_opts(Default, TcpOpts)} | Options1 ]; false -> @@ -482,7 +482,7 @@ frame_options(Options) -> -spec init_gc_state(map()) -> emqx_gc:gc_state() | undefined. init_gc_state(Options) -> - emqx_misc:maybe_apply(fun emqx_gc:init/1, force_gc_policy(Options)). + emqx_utils:maybe_apply(fun emqx_gc:init/1, force_gc_policy(Options)). -spec force_gc_policy(map()) -> emqx_gc:opts() | undefined. force_gc_policy(Options) -> diff --git a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl index 4cf362d9d..b90fd630d 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_channel.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_channel.erl @@ -111,7 +111,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session); + emqx_utils:maybe_apply(fun emqx_coap_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> @@ -366,7 +366,7 @@ ensure_timer(Name, Time, Msg, #channel{timers = Timers} = Channel) -> end. make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) -> - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. ensure_keepalive_timer(Channel) -> @@ -710,7 +710,7 @@ process_connection( ) -> Queries = emqx_coap_message:get_option(uri_query, Req), case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun enrich_conninfo/2, fun run_conn_hooks/2, diff --git a/apps/emqx_gateway_coap/src/emqx_coap_session.erl b/apps/emqx_gateway_coap/src/emqx_coap_session.erl index 688defcbb..5ae169675 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_session.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_session.erl @@ -81,7 +81,7 @@ %%%------------------------------------------------------------------- -spec new() -> session(). new() -> - _ = emqx_misc:rand_seed(), + _ = emqx_utils:rand_seed(), #session{ transport_manager = emqx_coap_tm:new(), observe_manager = emqx_coap_observe_res:new_manager(), diff --git a/apps/emqx_gateway_coap/src/emqx_coap_tm.erl b/apps/emqx_gateway_coap/src/emqx_coap_tm.erl index 82f616b25..68a7ae237 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_tm.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_tm.erl @@ -272,12 +272,12 @@ cancel_state_timer(#state_machine{timers = Timers} = Machine) -> undefined -> Machine; Ref -> - _ = emqx_misc:cancel_timer(Ref), + _ = emqx_utils:cancel_timer(Ref), Machine#state_machine{timers = maps:remove(state_timer, Timers)} end. process_timer(SeqId, {Type, Interval, Msg}, Timers) -> - Ref = emqx_misc:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}), + Ref = emqx_utils:start_timer(Interval, {state_machine, {SeqId, Type, Msg}}), Timers#{Type => Ref}. -spec delete_machine(manager_key(), manager()) -> manager(). @@ -293,7 +293,7 @@ delete_machine(Id, Manager) -> } -> lists:foreach( fun({_, Ref}) -> - emqx_misc:cancel_timer(Ref) + emqx_utils:cancel_timer(Ref) end, maps:to_list(Timers) ), diff --git a/apps/emqx_gateway_coap/src/emqx_coap_transport.erl b/apps/emqx_gateway_coap/src/emqx_coap_transport.erl index 1948c969d..daea13ba8 100644 --- a/apps/emqx_gateway_coap/src/emqx_coap_transport.erl +++ b/apps/emqx_gateway_coap/src/emqx_coap_transport.erl @@ -119,7 +119,7 @@ idle(out, #coap_message{type = non} = Msg, _) -> timeouts => [{stop_timeout, ?NON_LIFETIME}] }); idle(out, Msg, Transport) -> - _ = emqx_misc:rand_seed(), + _ = emqx_utils:rand_seed(), Timeout = ?ACK_TIMEOUT + rand:uniform(?ACK_RANDOM_FACTOR), out(Msg, #{ next => wait_ack, diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl index 7234e7a2f..3b2c8d73b 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_channel.erl @@ -681,14 +681,14 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> ensure_timer(Name, remove_timer_ref(Name, Channel)). cancel_timer(Name, Channel = #channel{timers = Timers}) -> - emqx_misc:cancel_timer(maps:get(Name, Timers, undefined)), + emqx_utils:cancel_timer(maps:get(Name, Timers, undefined)), remove_timer_ref(Name, Channel). remove_timer_ref(Name, Channel = #channel{timers = Timers}) -> @@ -792,4 +792,4 @@ proto_name_to_protocol(ProtoName) when is_binary(ProtoName) -> binary_to_atom(ProtoName). anonymous_clientid() -> - iolist_to_binary(["exproto-", emqx_misc:gen_id()]). + iolist_to_binary(["exproto-", emqx_utils:gen_id()]). diff --git a/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl b/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl index af15ef9d3..34883cdce 100644 --- a/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl +++ b/apps/emqx_gateway_exproto/src/emqx_exproto_gcli.erl @@ -50,7 +50,7 @@ start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_misc:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [] diff --git a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl index a424375dd..77652744a 100644 --- a/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway_lwm2m/src/emqx_lwm2m_channel.erl @@ -105,7 +105,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_lwm2m_session:info/1, Session); + emqx_utils:maybe_apply(fun emqx_lwm2m_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> @@ -286,7 +286,7 @@ handle_call(discard, _From, Channel) -> % pendings = Pendings}) -> % ok = emqx_session:takeover(Session), % %% TODO: Should not drain deliver here (side effect) -% Delivers = emqx_misc:drain_deliver(), +% Delivers = emqx_utils:drain_deliver(), % AllPendings = lists:append(Delivers, Pendings), % shutdown_and_reply(takenover, AllPendings, Channel); @@ -390,7 +390,7 @@ set_peercert_infos(Peercert, ClientInfo) -> ClientInfo#{dn => DN, cn => CN}. make_timer(Name, Time, Msg, Channel = #channel{timers = Timers}) -> - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. update_life_timer(#channel{session = Session, timers = Timers} = Channel) -> @@ -413,7 +413,7 @@ do_takeover(_DesireId, Msg, Channel) -> do_connect(Req, Result, Channel, Iter) -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun check_lwm2m_version/2, fun enrich_conninfo/2, diff --git a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl index c27c0ba3f..1ccc8b95a 100644 --- a/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl +++ b/apps/emqx_gateway_mqttsn/src/emqx_mqttsn_channel.erl @@ -218,7 +218,7 @@ info(conn_state, #channel{conn_state = ConnState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_session:info/1, Session); + emqx_utils:maybe_apply(fun emqx_session:info/1, Session); info(will_msg, #channel{will_msg = WillMsg}) -> WillMsg; info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> @@ -282,7 +282,7 @@ enrich_clientinfo( feedvar(Override, Packet, ConnInfo, ClientInfo0), ClientInfo0 ), - {ok, NPacket, NClientInfo} = emqx_misc:pipeline( + {ok, NPacket, NClientInfo} = emqx_utils:pipeline( [ fun maybe_assign_clientid/2, %% FIXME: CALL After authentication successfully @@ -414,7 +414,7 @@ process_connect( Channel#channel{session = Session} ); {ok, #{session := Session, present := true, pendings := Pendings}} -> - Pendings1 = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), + Pendings1 = lists:usort(lists:append(Pendings, emqx_utils:drain_deliver())), NChannel = Channel#channel{ session = Session, resuming = true, @@ -595,7 +595,7 @@ handle_in( Channel = #channel{conn_state = idle} ) -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun enrich_conninfo/2, fun run_conn_hooks/2, @@ -718,7 +718,7 @@ handle_in(PubPkt = ?SN_PUBLISH_MSG(_Flags, TopicId0, MsgId, _Data), Channel) -> Id end, case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun check_qos3_enable/2, fun preproc_pub_pkt/2, @@ -877,7 +877,7 @@ handle_in( end; handle_in(SubPkt = ?SN_SUBSCRIBE_MSG(_, MsgId, _), Channel) -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun preproc_subs_type/2, fun check_subscribe_authz/2, @@ -911,7 +911,7 @@ handle_in( Channel ) -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun preproc_unsub_type/2, fun run_client_unsub_hook/2, @@ -1823,7 +1823,7 @@ handle_call( ) -> ok = emqx_session:takeover(Session), %% TODO: Should not drain deliver here (side effect) - Delivers = emqx_misc:drain_deliver(), + Delivers = emqx_utils:drain_deliver(), AllPendings = lists:append(Delivers, Pendings), shutdown_and_reply(takenover, AllPendings, Channel); %handle_call(list_authz_cache, _From, Channel) -> @@ -2247,7 +2247,7 @@ ensure_register_timer(Channel) -> ensure_register_timer(RetryTimes, Channel = #channel{timers = Timers}) -> Msg = maps:get(register_timer, ?TIMER_TABLE), - TRef = emqx_misc:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}), + TRef = emqx_utils:start_timer(?REGISTER_TIMEOUT, {Msg, RetryTimes}), Channel#channel{timers = Timers#{register_timer => TRef}}. cancel_timer(Name, Channel = #channel{timers = Timers}) -> @@ -2255,7 +2255,7 @@ cancel_timer(Name, Channel = #channel{timers = Timers}) -> undefined -> Channel; TRef -> - emqx_misc:cancel_timer(TRef), + emqx_utils:cancel_timer(TRef), Channel#channel{timers = maps:without([Name], Timers)} end. @@ -2270,7 +2270,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> diff --git a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl index 13b70348a..316432dea 100644 --- a/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl +++ b/apps/emqx_gateway_stomp/src/emqx_stomp_channel.erl @@ -252,7 +252,7 @@ enrich_clientinfo( feedvar(Override, Packet, ConnInfo, ClientInfo0), ClientInfo0 ), - {ok, NPacket, NClientInfo} = emqx_misc:pipeline( + {ok, NPacket, NClientInfo} = emqx_utils:pipeline( [ fun maybe_assign_clientid/2, fun parse_heartbeat/2, @@ -416,7 +416,7 @@ handle_in( {error, unexpected_connect, Channel}; handle_in(Packet = ?PACKET(?CMD_CONNECT), Channel) -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun enrich_conninfo/2, fun negotiate_version/2, @@ -474,7 +474,7 @@ handle_in( Topic = header(<<"destination">>, Headers), Ack = header(<<"ack">>, Headers, <<"auto">>), case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun parse_topic_filter/2, fun check_subscribed_status/2, @@ -796,7 +796,7 @@ handle_call( reply({error, no_subid}, Channel); SubId -> case - emqx_misc:pipeline( + emqx_utils:pipeline( [ fun parse_topic_filter/2, fun check_subscribed_status/2 @@ -869,7 +869,7 @@ handle_call(discard, _From, Channel) -> % pendings = Pendings}) -> % ok = emqx_session:takeover(Session), % %% TODO: Should not drain deliver here (side effect) -% Delivers = emqx_misc:drain_deliver(), +% Delivers = emqx_utils:drain_deliver(), % AllPendings = lists:append(Delivers, Pendings), % shutdown_and_reply(takenover, AllPendings, Channel); @@ -1289,7 +1289,7 @@ ensure_timer(Name, Channel = #channel{timers = Timers}) -> ensure_timer(Name, Time, Channel = #channel{timers = Timers}) -> Msg = maps:get(Name, ?TIMER_TABLE), - TRef = emqx_misc:start_timer(Time, Msg), + TRef = emqx_utils:start_timer(Time, Msg), Channel#channel{timers = Timers#{Name => TRef}}. reset_timer(Name, Channel) -> diff --git a/apps/emqx_machine/src/emqx_global_gc.erl b/apps/emqx_machine/src/emqx_global_gc.erl index f17ab2d16..121855e68 100644 --- a/apps/emqx_machine/src/emqx_global_gc.erl +++ b/apps/emqx_machine/src/emqx_global_gc.erl @@ -86,7 +86,7 @@ ensure_timer(State) -> disabled -> State; Interval when is_integer(Interval) -> - TRef = emqx_misc:start_timer(Interval, run), + TRef = emqx_utils:start_timer(Interval, run), State#{timer := TRef} end. diff --git a/apps/emqx_management/src/emqx_mgmt_api_clients.erl b/apps/emqx_management/src/emqx_mgmt_api_clients.erl index cac3edaed..cd4829342 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_clients.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_clients.erl @@ -644,7 +644,7 @@ list_clients(QString) -> fun ?MODULE:format_channel_info/2 ); Node0 -> - case emqx_misc:safe_to_existing_atom(Node0) of + case emqx_utils:safe_to_existing_atom(Node0) of {ok, Node1} -> QStringWithoutNode = maps:without([<<"node">>], QString), emqx_mgmt_api:node_query( diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index 409af4d95..1b69835f9 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -149,7 +149,7 @@ subscriptions(get, #{query_string := QString}) -> fun ?MODULE:format/2 ); Node0 -> - case emqx_misc:safe_to_existing_atom(Node0) of + case emqx_utils:safe_to_existing_atom(Node0) of {ok, Node1} -> emqx_mgmt_api:node_query( Node1, diff --git a/apps/emqx_management/src/emqx_mgmt_api_trace.erl b/apps/emqx_management/src/emqx_mgmt_api_trace.erl index a69837eb3..5df641add 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_trace.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_trace.erl @@ -498,7 +498,7 @@ download_trace_log(get, #{bindings := #{name := Name}, query_string := Query}) - %% We generate a session ID so that we name files %% with unique names. Then we won't cause %% overwrites for concurrent requests. - SessionId = emqx_misc:gen_id(), + SessionId = emqx_utils:gen_id(), ZipDir = filename:join([emqx_trace:zip_dir(), SessionId]), ok = file:make_dir(ZipDir), %% Write files to ZipDir and create an in-memory zip file diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 6f2a27414..12a7a6641 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -174,7 +174,7 @@ create_app(Name, ApiSecret, Enable, ExpiredAt, Desc) -> desc = Desc, created_at = erlang:system_time(second), api_secret_hash = emqx_dashboard_admin:hash(ApiSecret), - api_key = list_to_binary(emqx_misc:gen_id(16)) + api_key = list_to_binary(emqx_utils:gen_id(16)) }, case create_app(App) of {ok, Res} -> @@ -213,7 +213,7 @@ do_force_create_app(App, ApiKey, NamePrefix) -> end. generate_unique_name(NamePrefix) -> - New = list_to_binary(NamePrefix ++ emqx_misc:gen_id(16)), + New = list_to_binary(NamePrefix ++ emqx_utils:gen_id(16)), case mnesia:read(?APP, New) of [] -> New; _ -> generate_unique_name(NamePrefix) @@ -246,7 +246,7 @@ init_bootstrap_file(File) -> {ok, MP} = re:compile(<<"(\.+):(\.+$)">>, [ungreedy]), init_bootstrap_file(File, Dev, MP); {error, Reason0} -> - Reason = emqx_misc:explain_posix(Reason0), + Reason = emqx_utils:explain_posix(Reason0), ?SLOG( error, #{ diff --git a/apps/emqx_management/src/emqx_mgmt_cli.erl b/apps/emqx_management/src/emqx_mgmt_cli.erl index 442d5c7de..253d527ac 100644 --- a/apps/emqx_management/src/emqx_mgmt_cli.erl +++ b/apps/emqx_management/src/emqx_mgmt_cli.erl @@ -356,7 +356,7 @@ mnesia(_) -> %% @doc Logger Command log(["set-level", Level]) -> - case emqx_misc:safe_to_existing_atom(Level) of + case emqx_utils:safe_to_existing_atom(Level) of {ok, Level1} -> case emqx_logger:set_log_level(Level1) of ok -> emqx_ctl:print("~ts~n", [Level]); @@ -369,7 +369,7 @@ log(["primary-level"]) -> Level = emqx_logger:get_primary_log_level(), emqx_ctl:print("~ts~n", [Level]); log(["primary-level", Level]) -> - case emqx_misc:safe_to_existing_atom(Level) of + case emqx_utils:safe_to_existing_atom(Level) of {ok, Level1} -> _ = emqx_logger:set_primary_log_level(Level1), ok; @@ -392,7 +392,7 @@ log(["handlers", "list"]) -> ], ok; log(["handlers", "start", HandlerId]) -> - case emqx_misc:safe_to_existing_atom(HandlerId) of + case emqx_utils:safe_to_existing_atom(HandlerId) of {ok, HandlerId1} -> case emqx_logger:start_log_handler(HandlerId1) of ok -> @@ -406,7 +406,7 @@ log(["handlers", "start", HandlerId]) -> emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId]) end; log(["handlers", "stop", HandlerId]) -> - case emqx_misc:safe_to_existing_atom(HandlerId) of + case emqx_utils:safe_to_existing_atom(HandlerId) of {ok, HandlerId1} -> case emqx_logger:stop_log_handler(HandlerId1) of ok -> @@ -420,9 +420,9 @@ log(["handlers", "stop", HandlerId]) -> emqx_ctl:print("[error] invalid handler:~ts~n", [HandlerId]) end; log(["handlers", "set-level", HandlerId, Level]) -> - case emqx_misc:safe_to_existing_atom(HandlerId) of + case emqx_utils:safe_to_existing_atom(HandlerId) of {ok, HandlerId1} -> - case emqx_misc:safe_to_existing_atom(Level) of + case emqx_utils:safe_to_existing_atom(Level) of {ok, Level1} -> case emqx_logger:set_log_handler_level(HandlerId1, Level1) of ok -> @@ -628,7 +628,7 @@ listeners([]) -> emqx_listeners:list() ); listeners(["stop", ListenerId]) -> - case emqx_misc:safe_to_existing_atom(ListenerId) of + case emqx_utils:safe_to_existing_atom(ListenerId) of {ok, ListenerId1} -> case emqx_listeners:stop_listener(ListenerId1) of ok -> @@ -640,7 +640,7 @@ listeners(["stop", ListenerId]) -> emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]) end; listeners(["start", ListenerId]) -> - case emqx_misc:safe_to_existing_atom(ListenerId) of + case emqx_utils:safe_to_existing_atom(ListenerId) of {ok, ListenerId1} -> case emqx_listeners:start_listener(ListenerId1) of ok -> @@ -652,7 +652,7 @@ listeners(["start", ListenerId]) -> emqx_ctl:print("Invalid listener: ~0p~n", [ListenerId]) end; listeners(["restart", ListenerId]) -> - case emqx_misc:safe_to_existing_atom(ListenerId) of + case emqx_utils:safe_to_existing_atom(ListenerId) of {ok, ListenerId1} -> case emqx_listeners:restart_listener(ListenerId1) of ok -> diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index 91cdbedd3..85313c181 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -328,7 +328,7 @@ handle_info(Info, State) -> terminate(_Reason, #{stats_timer := StatsTimer} = State) -> emqx_conf:remove_handler([delayed]), - emqx_misc:cancel_timer(StatsTimer), + emqx_utils:cancel_timer(StatsTimer), do_load_or_unload(false, State). code_change(_Vsn, State, _Extra) -> @@ -370,14 +370,14 @@ ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when Ts < PubAt -> - ok = emqx_misc:cancel_timer(TRef), + ok = emqx_utils:cancel_timer(TRef), ensure_publish_timer(Ts, ?NOW, State); ensure_publish_timer(_Key, State) -> State. ensure_publish_timer(Ts, Now, State) -> Interval = max(1, Ts - Now), - TRef = emqx_misc:start_timer(Interval, do_publish), + TRef = emqx_utils:start_timer(Interval, do_publish), State#{publish_timer := TRef, publish_at := Now + Interval}. do_publish(Key, Now) -> @@ -418,7 +418,7 @@ do_load_or_unload(true, State) -> State; do_load_or_unload(false, #{publish_timer := PubTimer} = State) -> emqx_hooks:del('message.publish', {?MODULE, on_message_publish}), - emqx_misc:cancel_timer(PubTimer), + emqx_utils:cancel_timer(PubTimer), ets:delete_all_objects(?TAB), State#{publish_timer := undefined, publish_at := 0}; do_load_or_unload(_, State) -> diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 1d49fb3e7..3b27302df 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -161,7 +161,7 @@ handle_call(enable, _From, State) -> FirstReportTimeoutMS = timer:seconds(10), {reply, ok, ensure_report_timer(FirstReportTimeoutMS, State)}; handle_call(disable, _From, State = #state{timer = Timer}) -> - emqx_misc:cancel_timer(Timer), + emqx_utils:cancel_timer(Timer), {reply, ok, State#state{timer = undefined}}; handle_call(get_node_uuid, _From, State = #state{node_uuid = UUID}) -> {reply, {ok, UUID}, State}; @@ -205,7 +205,7 @@ ensure_report_timer(State = #state{report_interval = ReportInterval}) -> ensure_report_timer(ReportInterval, State). ensure_report_timer(ReportInterval, State) -> - State#state{timer = emqx_misc:start_timer(ReportInterval, time_to_report_telemetry_data)}. + State#state{timer = emqx_utils:start_timer(ReportInterval, time_to_report_telemetry_data)}. os_info() -> case erlang:system_info(os_type) of diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl index 9b286f360..a3048e5dd 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl @@ -79,7 +79,7 @@ health_check_ecpool_workers(PoolName, CheckFunc, Timeout) -> false end end, - try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of [_ | _] = Status -> lists:all(fun(St) -> St =:= true end, Status); [] -> diff --git a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl index 40f88a0dc..8844fe586 100644 --- a/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl +++ b/apps/emqx_plugin_libs/src/emqx_plugin_libs_rule.erl @@ -225,7 +225,7 @@ tcp_connectivity(Host, Port) -> ) -> ok | {error, Reason :: term()}. tcp_connectivity(Host, Port, Timeout) -> - case gen_tcp:connect(Host, Port, emqx_misc:ipv6_probe([]), Timeout) of + case gen_tcp:connect(Host, Port, emqx_utils:ipv6_probe([]), Timeout) of {ok, Sock} -> gen_tcp:close(Sock), ok; diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 60d52f58b..05d9508b6 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -144,7 +144,7 @@ terminate(_Reason, _State) -> ok. ensure_timer(Interval) -> - emqx_misc:start_timer(Interval, ?TIMER_MSG). + emqx_utils:start_timer(Interval, ?TIMER_MSG). %%-------------------------------------------------------------------- %% prometheus callbacks diff --git a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl index 0fa4c0bd8..331480064 100644 --- a/apps/emqx_resource/src/emqx_resource_buffer_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_buffer_worker.erl @@ -1535,7 +1535,7 @@ queue_count(Q) -> disk_queue_dir(Id, Index) -> QDir0 = binary_to_list(Id) ++ ":" ++ integer_to_list(Index), QDir = filename:join([emqx:data_dir(), "bufs", node(), QDir0]), - emqx_misc:safe_filename(QDir). + emqx_utils:safe_filename(QDir). clear_disk_queue_dir(Id, Index) -> ReplayQDir = disk_queue_dir(Id, Index), diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index 9fe0fbdf2..7ecf56c18 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -539,7 +539,7 @@ stop_resource(#data{state = ResState, id = ResId} = Data) -> Data#data{status = stopped}. make_test_id() -> - RandId = iolist_to_binary(emqx_misc:gen_id(16)), + RandId = iolist_to_binary(emqx_utils:gen_id(16)), <>. handle_manually_health_check(From, Data) -> @@ -613,7 +613,7 @@ maybe_alarm(_Status, ResId, Error) -> HrError = case Error of undefined -> <<"Unknown reason">>; - _Else -> emqx_misc:readable_error_msg(Error) + _Else -> emqx_utils:readable_error_msg(Error) end, emqx_alarm:activate( ResId, diff --git a/apps/emqx_retainer/src/emqx_retainer.app.src b/apps/emqx_retainer/src/emqx_retainer.app.src index 11013cdd3..7bfc8ee4e 100644 --- a/apps/emqx_retainer/src/emqx_retainer.app.src +++ b/apps/emqx_retainer/src/emqx_retainer.app.src @@ -2,7 +2,7 @@ {application, emqx_retainer, [ {description, "EMQX Retainer"}, % strict semver, bump manually! - {vsn, "5.0.11"}, + {vsn, "5.0.12"}, {modules, []}, {registered, [emqx_retainer_sup]}, {applications, [kernel, stdlib, emqx, emqx_ctl]}, diff --git a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl index 454a65eb3..32cbba084 100644 --- a/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl +++ b/apps/emqx_retainer/src/emqx_retainer_dispatcher.erl @@ -91,7 +91,7 @@ worker() -> | ignore. start_link(Pool, Id) -> gen_server:start_link( - {local, emqx_misc:proc_name(?MODULE, Id)}, + {local, emqx_utils:proc_name(?MODULE, Id)}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}] diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index 40158d342..a9f24ddcd 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -175,7 +175,7 @@ safe_publish(RuleId, Topic, QoS, Flags, Payload, PubProps) -> flags = Flags, headers = #{ republish_by => RuleId, - properties => emqx_misc:pub_props_to_packet(PubProps) + properties => emqx_utils:pub_props_to_packet(PubProps) }, topic = Topic, payload = Payload, diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl index c63d6275f..d66f2c1c9 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_engine_api.erl @@ -343,7 +343,7 @@ param_path_id() -> {200, Result} end; '/rules'(post, #{body := Params0}) -> - case maps:get(<<"id">>, Params0, list_to_binary(emqx_misc:gen_id(8))) of + case maps:get(<<"id">>, Params0, list_to_binary(emqx_utils:gen_id(8))) of <<>> -> {400, #{code => 'BAD_REQUEST', message => <<"empty rule id is not allowed">>}}; Id -> @@ -459,11 +459,11 @@ param_path_id() -> %%------------------------------------------------------------------------------ err_msg({RuleError, {_E, Reason, _S}}) -> - emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason)); + emqx_utils:readable_error_msg(encode_nested_error(RuleError, Reason)); err_msg({Reason, _Details}) -> - emqx_misc:readable_error_msg(Reason); + emqx_utils:readable_error_msg(Reason); err_msg(Msg) -> - emqx_misc:readable_error_msg(Msg). + emqx_utils:readable_error_msg(Msg). encode_nested_error(RuleError, Reason) when is_tuple(Reason) -> encode_nested_error(RuleError, element(1, Reason)); diff --git a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl index 944aa02f8..81435d9ac 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_funcs.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_funcs.erl @@ -643,10 +643,10 @@ map(Data) -> emqx_plugin_libs_rule:map(Data). bin2hexstr(Bin) when is_binary(Bin) -> - emqx_misc:bin_to_hexstr(Bin, upper). + emqx_utils:bin_to_hexstr(Bin, upper). hexstr2bin(Str) when is_binary(Str) -> - emqx_misc:hexstr_to_bin(Str). + emqx_utils:hexstr_to_bin(Str). %%------------------------------------------------------------------------------ %% NULL Funcs @@ -944,7 +944,7 @@ sha256(S) when is_binary(S) -> hash(sha256, S). hash(Type, Data) -> - emqx_misc:bin_to_hexstr(crypto:hash(Type, Data), lower). + emqx_utils:bin_to_hexstr(crypto:hash(Type, Data), lower). %%------------------------------------------------------------------------------ %% gzip Funcs diff --git a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl index e97c45b35..455efe389 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_sqltester.erl @@ -48,7 +48,7 @@ test(#{sql := Sql, context := Context}) -> end. test_rule(Sql, Select, Context, EventTopics) -> - RuleId = iolist_to_binary(["sql_tester:", emqx_misc:gen_id(16)]), + RuleId = iolist_to_binary(["sql_tester:", emqx_utils:gen_id(16)]), ok = emqx_rule_engine:maybe_add_metrics_for_rule(RuleId), Rule = #{ id => RuleId, diff --git a/apps/emqx_statsd/src/emqx_statsd.app.src b/apps/emqx_statsd/src/emqx_statsd.app.src index 9d40c6857..412e0b685 100644 --- a/apps/emqx_statsd/src/emqx_statsd.app.src +++ b/apps/emqx_statsd/src/emqx_statsd.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_statsd, [ {description, "EMQX Statsd"}, - {vsn, "5.0.7"}, + {vsn, "5.0.8"}, {registered, []}, {mod, {emqx_statsd_app, []}}, {applications, [ diff --git a/apps/emqx_statsd/src/emqx_statsd.erl b/apps/emqx_statsd/src/emqx_statsd.erl index c2e1819ac..c5a7fc1c8 100644 --- a/apps/emqx_statsd/src/emqx_statsd.erl +++ b/apps/emqx_statsd/src/emqx_statsd.erl @@ -144,7 +144,7 @@ flush_interval(_FlushInterval, SampleInterval) -> SampleInterval. ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) -> - State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. + State#{timer => emqx_utils:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}. check_multicall_result({Results, []}) -> case diff --git a/apps/emqx/src/emqx_misc.erl b/apps/emqx_utils/src/emqx_utils.erl similarity index 99% rename from apps/emqx/src/emqx_misc.erl rename to apps/emqx_utils/src/emqx_utils.erl index d0c7397c8..21dbd339d 100644 --- a/apps/emqx/src/emqx_misc.erl +++ b/apps/emqx_utils/src/emqx_utils.erl @@ -14,14 +14,12 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_misc). +-module(emqx_utils). -compile(inline). +%% [TODO] Cleanup so the instruction below is not necessary. -elvis([{elvis_style, god_modules, disable}]). --include("types.hrl"). --include("logger.hrl"). - -export([ merge_opts/2, maybe_apply/2, @@ -71,6 +69,8 @@ -export([clamp/3, redact/1, redact/2, is_redacted/2, is_redacted/3]). +-type maybe(T) :: undefined | T. + -dialyzer({nowarn_function, [nolink_apply/2]}). -define(SHORT, 8). @@ -221,6 +221,7 @@ drain_down(Cnt, Acc) -> %% `ok': There is nothing out of the ordinary. %% `shutdown': Some numbers (message queue length hit the limit), %% hence shutdown for greater good (system stability). +%% [FIXME] cross-dependency on `emqx_types`. -spec check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}. check_oom(Policy) -> check_oom(self(), Policy). @@ -279,6 +280,7 @@ proc_name(Mod, Id) -> list_to_atom(lists:concat([Mod, "_", Id])). %% Get Proc's Stats. +%% [FIXME] cross-dependency on `emqx_types`. -spec proc_stats() -> emqx_types:stats(). proc_stats() -> proc_stats(self()). diff --git a/apps/emqx_utils/test/emqx_utils_SUITE.erl b/apps/emqx_utils/test/emqx_utils_SUITE.erl index 09a8d085c..598f05a21 100644 --- a/apps/emqx_utils/test/emqx_utils_SUITE.erl +++ b/apps/emqx_utils/test/emqx_utils_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,29 +20,188 @@ -compile(nowarn_export_all). -include_lib("eunit/include/eunit.hrl"). --include_lib("common_test/include/ct.hrl"). -all() -> - emqx_common_test_helpers:all(?MODULE). +-define(SOCKOPTS, [ + binary, + {packet, raw}, + {reuseaddr, true}, + {backlog, 512}, + {nodelay, true} +]). -init_per_suite(Config) -> - emqx_common_test_helpers:start_apps([emqx_conf, emqx_utils]), - Config. +all() -> emqx_common_test_helpers:all(?MODULE). -end_per_suite(_Config) -> - emqx_common_test_helpers:stop_apps([emqx_conf, emqx_utils]), - ok. +t_merge_opts(_) -> + Opts = emqx_utils:merge_opts(?SOCKOPTS, [ + raw, + binary, + {backlog, 1024}, + {nodelay, false}, + {max_clients, 1024}, + {acceptors, 16} + ]), + ?assertEqual(1024, proplists:get_value(backlog, Opts)), + ?assertEqual(1024, proplists:get_value(max_clients, Opts)), + ?assertEqual( + [ + binary, + raw, + {acceptors, 16}, + {backlog, 1024}, + {max_clients, 1024}, + {nodelay, false}, + {packet, raw}, + {reuseaddr, true} + ], + lists:sort(Opts) + ). -init_per_testcase(TestCase, Config) -> - emqx_common_test_helpers:init_per_testcase(?MODULE, TestCase, Config). +t_maybe_apply(_) -> + ?assertEqual(undefined, emqx_utils:maybe_apply(fun(A) -> A end, undefined)), + ?assertEqual(a, emqx_utils:maybe_apply(fun(A) -> A end, a)). -end_per_testcase(TestCase, Config) -> - emqx_common_test_helpers:end_per_testcase(?MODULE, TestCase, Config). +t_run_fold(_) -> + ?assertEqual(1, emqx_utils:run_fold([], 1, state)), + Add = fun(I, St) -> I + St end, + Mul = fun(I, St) -> I * St end, + ?assertEqual(6, emqx_utils:run_fold([Add, Mul], 1, 2)). -t_fail(init, Config) -> - Config; -t_fail('end', _Config) -> - ok. +t_pipeline(_) -> + ?assertEqual({ok, input, state}, emqx_utils:pipeline([], input, state)), + Funs = [ + fun(_I, _St) -> ok end, + fun(_I, St) -> {ok, St + 1} end, + fun(I, St) -> {ok, I + 1, St + 1} end, + fun(I, St) -> {ok, I * 2, St * 2} end + ], + ?assertEqual({ok, 4, 6}, emqx_utils:pipeline(Funs, 1, 1)), + ?assertEqual( + {error, undefined, 1}, emqx_utils:pipeline([fun(_I) -> {error, undefined} end], 1, 1) + ), + ?assertEqual( + {error, undefined, 2}, + emqx_utils:pipeline([fun(_I, _St) -> {error, undefined, 2} end], 1, 1) + ). -t_fail(_Config) -> - ?assert(false). +t_start_timer(_) -> + TRef = emqx_utils:start_timer(1, tmsg), + timer:sleep(2), + ?assertEqual([{timeout, TRef, tmsg}], drain()), + ok = emqx_utils:cancel_timer(TRef). + +t_cancel_timer(_) -> + Timer = emqx_utils:start_timer(0, foo), + ok = emqx_utils:cancel_timer(Timer), + ?assertEqual([], drain()), + ok = emqx_utils:cancel_timer(undefined). + +t_proc_name(_) -> + ?assertEqual(emqx_pool_1, emqx_utils:proc_name(emqx_pool, 1)). + +t_proc_stats(_) -> + Pid1 = spawn(fun() -> exit(normal) end), + timer:sleep(10), + ?assertEqual([], emqx_utils:proc_stats(Pid1)), + Pid2 = spawn(fun() -> + ?assertMatch([{mailbox_len, 0} | _], emqx_utils:proc_stats()), + timer:sleep(200) + end), + timer:sleep(10), + Pid2 ! msg, + timer:sleep(10), + ?assertMatch([{mailbox_len, 1} | _], emqx_utils:proc_stats(Pid2)). + +t_drain_deliver(_) -> + self() ! {deliver, t1, m1}, + self() ! {deliver, t2, m2}, + ?assertEqual( + [ + {deliver, t1, m1}, + {deliver, t2, m2} + ], + emqx_utils:drain_deliver(2) + ). + +t_drain_down(_) -> + {Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end), + {Pid2, _Ref2} = erlang:spawn_monitor(fun() -> ok end), + timer:sleep(100), + ?assertEqual([Pid1, Pid2], lists:sort(emqx_utils:drain_down(2))), + ?assertEqual([], emqx_utils:drain_down(1)). + +t_index_of(_) -> + try emqx_utils:index_of(a, []) of + _ -> ct:fail(should_throw_error) + catch + error:Reason -> + ?assertEqual(badarg, Reason) + end, + ?assertEqual(3, emqx_utils:index_of(a, [b, c, a, e, f])). + +t_check(_) -> + Policy = #{ + max_message_queue_len => 10, + max_heap_size => 1024 * 1024 * 8, + enable => true + }, + [self() ! {msg, I} || I <- lists:seq(1, 5)], + ?assertEqual(ok, emqx_utils:check_oom(Policy)), + [self() ! {msg, I} || I <- lists:seq(1, 6)], + ?assertEqual({shutdown, message_queue_too_long}, emqx_utils:check_oom(Policy)). + +drain() -> + drain([]). + +drain(Acc) -> + receive + Msg -> drain([Msg | Acc]) + after 0 -> + lists:reverse(Acc) + end. + +t_rand_seed(_) -> + ?assert(is_tuple(emqx_utils:rand_seed())). + +t_now_to_secs(_) -> + ?assert(is_integer(emqx_utils:now_to_secs(os:timestamp()))). + +t_now_to_ms(_) -> + ?assert(is_integer(emqx_utils:now_to_ms(os:timestamp()))). + +t_gen_id(_) -> + ?assertEqual(10, length(emqx_utils:gen_id(10))), + ?assertEqual(20, length(emqx_utils:gen_id(20))). + +t_pmap_normal(_) -> + ?assertEqual( + [5, 7, 9], + emqx_utils:pmap( + fun({A, B}) -> A + B end, + [{2, 3}, {3, 4}, {4, 5}] + ) + ). + +t_pmap_timeout(_) -> + ?assertExit( + timeout, + emqx_utils:pmap( + fun + (timeout) -> ct:sleep(1000); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, timeout], + 100 + ) + ). + +t_pmap_exception(_) -> + ?assertError( + foobar, + emqx_utils:pmap( + fun + (error) -> error(foobar); + ({A, B}) -> A + B + end, + [{2, 3}, {3, 4}, error] + ) + ). diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl index e899b0fcd..133ef0430 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_dynamo_SUITE.erl @@ -272,7 +272,7 @@ t_setup_via_config_and_publish(Config) -> {ok, _}, create_bridge(Config) ), - MsgId = emqx_misc:gen_id(), + MsgId = emqx_utils:gen_id(), SentData = #{id => MsgId, payload => ?PAYLOAD}, ?check_trace( begin @@ -309,7 +309,7 @@ t_setup_via_http_api_and_publish(Config) -> {ok, _}, create_bridge_http(PgsqlConfig) ), - MsgId = emqx_misc:gen_id(), + MsgId = emqx_utils:gen_id(), SentData = #{id => MsgId, payload => ?PAYLOAD}, ?check_trace( begin @@ -375,7 +375,7 @@ t_write_failure(Config) -> #{?snk_kind := resource_connected_enter}, 20_000 ), - SentData = #{id => emqx_misc:gen_id(), payload => ?PAYLOAD}, + SentData = #{id => emqx_utils:gen_id(), payload => ?PAYLOAD}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> ?assertMatch( {error, {resource_error, #{reason := timeout}}}, send_message(Config, SentData) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl index 3cc4c75e4..49bc444c6 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_gcp_pubsub_SUITE.erl @@ -809,7 +809,7 @@ test_publish_success_batch(Config) -> %% making 1-sized batches. also important to note that the pool %% size for the resource (replayq buffering) must be set to 1 to %% avoid further segmentation of batches. - emqx_misc:pmap(fun emqx:publish/1, Messages), + emqx_utils:pmap(fun emqx:publish/1, Messages), DecodedMessages0 = assert_http_request(ServiceAccountJSON), ?assertEqual(BatchSize, length(DecodedMessages0)), DecodedMessages1 = assert_http_request(ServiceAccountJSON), diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl index 69b1e26ce..be07a2bb7 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_mysql_SUITE.erl @@ -615,7 +615,7 @@ t_workload_fits_prepared_statement_limit(Config) -> create_bridge(Config) ), Results = lists:append( - emqx_misc:pmap( + emqx_utils:pmap( fun(_) -> [ begin diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl index ce964dd17..86b908038 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_cassa.erl @@ -102,7 +102,7 @@ on_start( ?SLOG(info, #{ msg => "starting_cassandra_connector", connector => InstId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), Options = [ diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl index 3749ecc6e..2872e8cf0 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_clickhouse.erl @@ -139,7 +139,7 @@ on_start( ?SLOG(info, #{ msg => "starting_clickhouse_connector", connector => InstanceID, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), PoolName = emqx_plugin_libs_pool:pool_name(InstanceID), Options = [ @@ -181,7 +181,7 @@ log_start_error(Config, Reason, Stacktrace) -> #{ msg => "clickhouse_connector_start_failed", error_reason => Reason, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }, ?SLOG(info, maps:merge(LogMessage, StacktraceMap)), ?tp( @@ -318,7 +318,7 @@ do_get_status(PoolName, Timeout) -> Error end end, - try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of Results -> case [E || {error, _} = E <- Results] of [] -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl index ee56e0c5f..85daefbb7 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_dynamo.erl @@ -95,7 +95,7 @@ on_start( ?SLOG(info, #{ msg => "starting_dynamo_connector", connector => InstanceId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), {Schema, Server} = get_host_schema(to_str(Url)), diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl index a2045b8c5..2295f63ab 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_gcp_pubsub.erl @@ -86,7 +86,7 @@ on_start( PoolType = random, Transport = tls, TransportOpts = emqx_tls_lib:to_client_opts(#{enable => true, verify => verify_none}), - NTransportOpts = emqx_misc:ipv6_probe(TransportOpts), + NTransportOpts = emqx_utils:ipv6_probe(TransportOpts), PoolOpts = [ {host, Host}, {port, Port}, @@ -587,7 +587,7 @@ do_get_status(InstanceId, PoolName, Timeout) -> false end end, - try emqx_misc:pmap(DoPerWorker, Workers, Timeout) of + try emqx_utils:pmap(DoPerWorker, Workers, Timeout) of [_ | _] = Status -> lists:all(fun(St) -> St =:= true end, Status); [] -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index afe17cae6..700eb2a81 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -216,8 +216,8 @@ start_client(InstId, Config) -> ?SLOG(info, #{ msg => "starting influxdb connector", connector => InstId, - config => emqx_misc:redact(Config), - client_config => emqx_misc:redact(ClientConfig) + config => emqx_utils:redact(Config), + client_config => emqx_utils:redact(ClientConfig) }), try do_start_client(InstId, ClientConfig, Config) @@ -353,7 +353,7 @@ password(_) -> []. redact_auth(Term) -> - emqx_misc:redact(Term, fun is_auth_key/1). + emqx_utils:redact(Term, fun is_auth_key/1). is_auth_key(Key) when is_binary(Key) -> string:equal("authorization", Key, true); diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl index d296b0b4f..205359bb8 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_rocketmq.erl @@ -265,7 +265,7 @@ client_id(InstanceId) -> erlang:binary_to_atom(Name, utf8). redact(Msg) -> - emqx_misc:redact(Msg, fun is_sensitive_key/1). + emqx_utils:redact(Msg, fun is_sensitive_key/1). is_sensitive_key(security_token) -> true; diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl index 746ab5485..9b6718882 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_tdengine.erl @@ -93,7 +93,7 @@ on_start( ?SLOG(info, #{ msg => "starting_tdengine_connector", connector => InstanceId, - config => emqx_misc:redact(Config) + config => emqx_utils:redact(Config) }), {Host, Port} = emqx_schema:parse_server(Server, ?TD_HOST_OPTIONS), diff --git a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl index 3c26d1966..43ccd9fc1 100644 --- a/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl +++ b/lib-ee/emqx_ee_schema_registry/test/emqx_ee_schema_registry_SUITE.erl @@ -218,7 +218,7 @@ start_cluster(Cluster) -> || {Name, Opts} <- Cluster ], on_exit(fun() -> - emqx_misc:pmap( + emqx_utils:pmap( fun(N) -> ct:pal("stopping ~p", [N]), ok = emqx_common_test_helpers:stop_slave(N) diff --git a/lib-ee/emqx_license/src/emqx_license_http_api.erl b/lib-ee/emqx_license/src/emqx_license_http_api.erl index 0133cd637..ef0e1ee52 100644 --- a/lib-ee/emqx_license/src/emqx_license_http_api.erl +++ b/lib-ee/emqx_license/src/emqx_license_http_api.erl @@ -95,7 +95,7 @@ sample_license_info_response() -> }. error_msg(Code, Msg) -> - #{code => Code, message => emqx_misc:readable_error_msg(Msg)}. + #{code => Code, message => emqx_utils:readable_error_msg(Msg)}. %% read license info '/license'(get, _Params) ->