From e3489b9d465934129e6d5e57e083d536bc70a4ad Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 16 Jun 2020 16:08:44 +0800 Subject: [PATCH] refactor(style): improve all types declaration - Add dialyzer for code analysis - Correct all the module type declarations - Use `-type()` to declaration a type instead of `-opaque` (It is advantageous to the code dialyzer) BROKEN CHANGES: - Add a ?DEFAULT_SUBOPTS for emqx_broker:subscribe/1,2,3 - Remove the needless internal function `emqx_vm:port_info(PortTerm, Keys)` --- .github/workflows/run_test_case.yaml | 4 ++++ Makefile | 4 ++++ include/emqx.hrl | 11 +++++----- include/types.hrl | 2 ++ src/emqx.erl | 8 ++++--- src/emqx_boot.erl | 2 +- src/emqx_broker.erl | 18 +++++++++------- src/emqx_broker_helper.erl | 2 +- src/emqx_channel.erl | 13 ++++++------ src/emqx_cm.erl | 13 ++++++------ src/emqx_connection.erl | 7 +++++++ src/emqx_ctl.erl | 2 +- src/emqx_frame.erl | 8 ++++--- src/emqx_gen_mod.erl | 13 ------------ src/emqx_hooks.erl | 8 +++---- src/emqx_logger.erl | 2 +- src/emqx_logger_formatter.erl | 4 +--- src/emqx_message.erl | 4 +--- src/emqx_misc.erl | 12 ++++++++--- src/emqx_mod_acl_internal.erl | 7 ++----- src/emqx_mod_delayed.erl | 14 +++---------- src/emqx_mod_topic_metrics.erl | 12 +++++------ src/emqx_mqtt_caps.erl | 7 ++++--- src/emqx_mqtt_props.erl | 31 +++++++++++++--------------- src/emqx_mqueue.erl | 2 +- src/emqx_os_mon.erl | 3 --- src/emqx_packet.erl | 7 ++++--- src/emqx_plugins.erl | 5 +++++ src/emqx_pool_sup.erl | 6 ++++-- src/emqx_router_helper.erl | 2 ++ src/emqx_session.erl | 18 +++++++--------- src/emqx_stats.erl | 4 ++-- src/emqx_sys.erl | 17 +++++---------- src/emqx_sys_mon.erl | 10 +++++---- src/emqx_topic.erl | 3 ++- src/emqx_tracer.erl | 4 +++- src/emqx_types.erl | 4 ++-- src/emqx_vm.erl | 7 ++----- src/emqx_ws_connection.erl | 7 +++++-- src/emqx_zone.erl | 2 +- test/emqx_SUITE.erl | 5 ++++- test/emqx_broker_SUITE.erl | 20 +++++++++++------- test/emqx_vm_SUITE.erl | 3 +-- 43 files changed, 175 insertions(+), 162 deletions(-) diff --git a/.github/workflows/run_test_case.yaml b/.github/workflows/run_test_case.yaml index 8bb918294..3d50e757f 100644 --- a/.github/workflows/run_test_case.yaml +++ b/.github/workflows/run_test_case.yaml @@ -13,6 +13,10 @@ jobs: steps: - uses: actions/checkout@v1 + - name: Code dialyzer + run: | + make dialyzer + rm -f rebar.lock - name: Run tests run: | make xref diff --git a/Makefile b/Makefile index f0ef7d6e8..2b97df326 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,10 @@ RUN_NODE_NAME = emqxdebug@127.0.0.1 .PHONY: all all: compile +.PHONY: dialyzer +dialyzer: + @rebar3 dialyzer + .PHONY: tests tests: eunit ct diff --git a/include/emqx.hrl b/include/emqx.hrl index bbeb5defd..5a10b84e6 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -99,7 +99,7 @@ node_id :: trie_node_id(), edge_count = 0 :: non_neg_integer(), topic :: binary() | undefined, - flags :: list(atom()) + flags :: list(atom()) | undefined }). -record(trie_edge, { @@ -121,7 +121,8 @@ severity :: notice | warning | error | critical, title :: iolist(), summary :: iolist(), - timestamp :: erlang:timestamp() + %% Timestamp (Unit: millisecond) + timestamp :: integer() | undefined }). %%-------------------------------------------------------------------- @@ -130,11 +131,11 @@ -record(plugin, { name :: atom(), - dir :: string(), + dir :: string() | undefined, descr :: string(), - vendor :: string(), + vendor :: string() | undefined, active = false :: boolean(), - info :: map(), + info = #{} :: map(), type :: atom() }). diff --git a/include/types.hrl b/include/types.hrl index 0cd10246c..e92f3f2a2 100644 --- a/include/types.hrl +++ b/include/types.hrl @@ -22,3 +22,5 @@ -type(ok_or_error(Value, Reason) :: {ok, Value} | {error, Reason}). +-type(mfargs() :: {module(), atom(), [term()]}). + diff --git a/src/emqx.erl b/src/emqx.erl index c21f252d5..2c6d444e7 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -146,7 +146,7 @@ unsubscribe(Topic) -> -spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). --spec(subscribers(emqx_topic:topic() | string()) -> list(emqx_types:subscriber())). +-spec(subscribers(emqx_topic:topic() | string()) -> [pid()]). subscribers(Topic) -> emqx_broker:subscribers(iolist_to_binary(Topic)). @@ -168,7 +168,9 @@ subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) -> hook(HookPoint, Action) -> emqx_hooks:add(HookPoint, Action). --spec(hook(emqx_hooks:hookpoint(), emqx_hooks:action(), emqx_hooks:filter() | integer()) +-spec(hook(emqx_hooks:hookpoint(), + emqx_hooks:action(), + emqx_hooks:filter() | integer() | list()) -> ok | {error, already_exists}). hook(HookPoint, Action, Priority) when is_integer(Priority) -> emqx_hooks:add(HookPoint, Action, Priority); @@ -182,7 +184,7 @@ hook(HookPoint, Action, InitArgs) when is_list(InitArgs) -> hook(HookPoint, Action, Filter, Priority) -> emqx_hooks:add(HookPoint, Action, Filter, Priority). --spec(unhook(emqx_hooks:hookpoint(), emqx_hooks:action()) -> ok). +-spec(unhook(emqx_hooks:hookpoint(), function() | {module(), atom()}) -> ok). unhook(HookPoint, Action) -> emqx_hooks:del(HookPoint, Action). diff --git a/src/emqx_boot.erl b/src/emqx_boot.erl index 015a1a95e..be27607be 100644 --- a/src/emqx_boot.erl +++ b/src/emqx_boot.erl @@ -20,7 +20,7 @@ -define(BOOT_MODULES, [router, broker, listeners]). --spec(is_enabled(all|list(router|broker|listeners)) -> boolean()). +-spec(is_enabled(all|router|broker|listeners) -> boolean()). is_enabled(Mod) -> (BootMods = boot_modules()) =:= all orelse lists:member(Mod, BootMods). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index df5939ef5..c5d9c6793 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -21,6 +21,7 @@ -include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). +-include("emqx_mqtt.hrl"). -logger_header("[Broker]"). @@ -118,12 +119,13 @@ subscribe(Topic) when is_binary(Topic) -> -spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - subscribe(Topic, SubId, #{qos => 0}); + subscribe(Topic, SubId, ?DEFAULT_SUBOPTS); subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> subscribe(Topic, undefined, SubOpts). -spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). -subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> +subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts0) -> + SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0), case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of false -> %% New ok = emqx_broker_helper:register_sub(SubPid, SubId), @@ -215,9 +217,8 @@ safe_publish(Msg) when is_record(Msg, message) -> catch _:Error:Stk-> ?LOG(error, "Publish error: ~0p~n~s~n~0p", - [Error, emqx_message:format(Msg), Stk]) - after - [] + [Error, emqx_message:format(Msg), Stk]), + [] end. -compile({inline, [delivery/1]}). @@ -283,7 +284,7 @@ forward(Node, To, Delivery, sync) -> dispatch(Topic, #delivery{message = Msg}) -> case subscribers(Topic) of [] -> ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]), - ok = inc_dropped_cnt(Topic), + ok = inc_dropped_cnt(Msg), {error, no_subscribers}; [Sub] -> %% optimize? dispatch(Sub, Topic, Msg); @@ -322,7 +323,8 @@ inc_dropped_cnt(Msg) -> end. -compile({inline, [subscribers/1]}). --spec(subscribers(emqx_topic:topic()) -> [pid()]). +-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()}) + -> [pid()]). subscribers(Topic) when is_binary(Topic) -> lookup_value(?SUBSCRIBER, Topic, []); subscribers(Shard = {shard, _Topic, _I}) -> @@ -367,7 +369,7 @@ subscriptions(SubId) -> undefined -> [] end. --spec(subscribed(pid(), emqx_topic:topic()) -> boolean()). +-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()). subscribed(SubPid, Topic) when is_pid(SubPid) -> ets:member(?SUBOPTION, {SubPid, Topic}); subscribed(SubId, Topic) when ?is_subid(SubId) -> diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index e0452fbcb..18ebbccf2 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -75,7 +75,7 @@ register_sub(SubPid, SubId) when is_pid(SubPid) -> lookup_subid(SubPid) when is_pid(SubPid) -> emqx_tables:lookup_value(?SUBMON, SubPid). --spec(lookup_subpid(emqx_types:subid()) -> pid()). +-spec(lookup_subpid(emqx_types:subid()) -> maybe(pid())). lookup_subpid(SubId) -> emqx_tables:lookup_value(?SUBID, SubId). diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index ff1ca4661..1fd35dd60 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -62,9 +62,9 @@ %% MQTT ClientInfo clientinfo :: emqx_types:clientinfo(), %% MQTT Session - session :: emqx_session:session(), + session :: maybe(emqx_session:session()), %% Keepalive - keepalive :: emqx_keepalive:keepalive(), + keepalive :: maybe(emqx_keepalive:keepalive()), %% MQTT Will Msg will_msg :: maybe(emqx_types:message()), %% MQTT Topic Aliases @@ -108,6 +108,8 @@ -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session, will_msg]). +-dialyzer({no_match, [shutdown/4, ensure_timer/2, interval/2]}). + %%-------------------------------------------------------------------- %% Info, Attrs and Caps %%-------------------------------------------------------------------- @@ -818,7 +820,8 @@ return_unsuback(Packet, Channel) -> -spec(handle_call(Req :: term(), channel()) -> {reply, Reply :: term(), channel()} - | {shutdown, Reason :: term(), Reply :: term(), channel()}). + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), emqx_types:packet(), channel()}). handle_call(kick, Channel) -> Channel1 = ensure_disconnected(kicked, Channel), disconnect_and_shutdown(kicked, ok, Channel1); @@ -936,9 +939,6 @@ handle_timeout(_TRef, retry_delivery, case emqx_session:retry(Session) of {ok, NSession} -> {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; - {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession}, - handle_out(publish, Publishes, reset_timer(retry_timer, NChannel)); {ok, Publishes, Timeout, NSession} -> NChannel = Channel#channel{session = NSession}, handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) @@ -1013,6 +1013,7 @@ interval(will_timer, #channel{will_msg = WillMsg}) -> %% Terminate %%-------------------------------------------------------------------- +-spec(terminate(any(), channel()) -> ok). terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 92d54c056..a24bb8fba 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -107,7 +107,7 @@ start_link() -> register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) -> Chan = {ClientId, ChanPid = self()}, true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), - register_channel(ClientId, ChanPid, ConnInfo); + register_channel_(ClientId, ChanPid, ConnInfo). %% @private %% @doc Register a channel with pid and conn_mod. @@ -117,7 +117,7 @@ register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) -> %% the conn_mod first for taking up the clientid access right. %% %% Note that: It should be called on a lock transaction -register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> +register_channel_(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) -> Chan = {ClientId, ChanPid}, true = ets:insert(?CHAN_TAB, Chan), true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}), @@ -211,7 +211,7 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> CleanStart = fun(_) -> ok = discard_session(ClientId), Session = create_session(ClientInfo, ConnInfo), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); @@ -223,13 +223,13 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> {ok, ConnMod, ChanPid, Session} -> ok = emqx_session:resume(ClientInfo, Session), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => true, pendings => Pendings}}; {error, not_found} -> Session = create_session(ClientInfo, ConnInfo), - register_channel(ClientId, Self, ConnInfo), + register_channel_(ClientId, Self, ConnInfo), {ok, #{session => Session, present => false}} end end, @@ -243,7 +243,8 @@ create_session(ClientInfo, ConnInfo) -> %% @doc Try to takeover a session. -spec(takeover_session(emqx_types:clientid()) - -> {ok, emqx_session:session()} | {error, Reason :: term()}). + -> {error, term()} + | {ok, atom(), pid(), emqx_session:session()}). takeover_session(ClientId) -> case lookup_channels(ClientId) of [] -> {error, not_found}; diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 3554ae893..e54495a71 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -103,6 +103,13 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({no_match, [info/2]}). +-dialyzer({nowarn_function, [ init/4 + , init_state/3 + , run_loop/2 + , system_terminate/4 + ]}). + -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist()) -> {ok, pid()}). start_link(Transport, Socket, Options) -> diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 4abe0ebf3..45cf34543 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -159,7 +159,7 @@ format(Msg) -> format(Format, Args) -> lists:flatten(io_lib:format(Format, Args)). --spec(format_usage([cmd_usage()]) -> ok). +-spec(format_usage([cmd_usage()]) -> [string()]). format_usage(UsageList) -> lists:map( fun({CmdParams, Desc}) -> diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index dce697054..1c27548f7 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -42,10 +42,10 @@ version => emqx_types:version() }). --opaque(parse_state() :: {none, options()} | cont_fun()). +-type(parse_state() :: {none, options()} | cont_fun()). --opaque(parse_result() :: {more, cont_fun()} - | {ok, emqx_types:packet(), binary(), parse_state()}). +-type(parse_result() :: {more, cont_fun()} + | {ok, emqx_types:packet(), binary(), parse_state()}). -type(cont_fun() :: fun((binary()) -> parse_result())). @@ -59,6 +59,8 @@ version => ?MQTT_PROTO_V4 }). +-dialyzer({no_match, [serialize_utf8_string/2]}). + %%-------------------------------------------------------------------- %% Init Parse State %%-------------------------------------------------------------------- diff --git a/src/emqx_gen_mod.erl b/src/emqx_gen_mod.erl index e320ec877..26a3c13cb 100644 --- a/src/emqx_gen_mod.erl +++ b/src/emqx_gen_mod.erl @@ -16,21 +16,8 @@ -module(emqx_gen_mod). --ifdef(use_specs). - -callback(load(Opts :: any()) -> ok | {error, term()}). -callback(unload(State :: term()) -> term()). -callback(description() -> any()). - --else. - --export([behaviour_info/1]). - -behaviour_info(callbacks) -> - [{load, 1}, {unload, 1}]; -behaviour_info(_Other) -> - undefined. - --endif. diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index d079a38c3..273071d75 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -60,12 +60,12 @@ %% equal priority values. -type(hookpoint() :: atom()). --type(action() :: function() | mfa()). --type(filter() :: function() | mfa()). +-type(action() :: function() | {function(), [term()]} | mfargs()). +-type(filter() :: function() | mfargs()). -record(callback, { action :: action(), - filter :: filter(), + filter :: maybe(filter()), priority :: integer() }). @@ -112,7 +112,7 @@ add(HookPoint, Action, Filter, Priority) when is_integer(Priority) -> add(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}). %% @doc Unregister a callback. --spec(del(hookpoint(), action()) -> ok). +-spec(del(hookpoint(), function() | {module(), atom()}) -> ok). del(HookPoint, Action) -> gen_server:cast(?SERVER, {del, HookPoint, Action}). diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index ebd2ac697..f250c7e93 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -221,7 +221,7 @@ trans([{eof, L} | AST], LogHeader, ResAST) -> trans([{attribute, _, module, _Mod} = M | AST], Header, ResAST) -> trans(AST, Header, [export_header_fun(), M | ResAST]); trans([{attribute, _, logger_header, Header} | AST], _, ResAST) -> - io_lib:printable_list(Header) orelse error({invalid_string, Header}), + io_lib:printable_list(Header) orelse erlang:error({invalid_string, Header}), trans(AST, Header, ResAST); trans([F | AST], LogHeader, ResAST) -> trans(AST, LogHeader, [F | ResAST]). diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl index ad5c9cef7..2528a63b5 100644 --- a/src/emqx_logger_formatter.erl +++ b/src/emqx_logger_formatter.erl @@ -253,9 +253,7 @@ format_time(SysTime,#{}) {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond), format_time({Date, {H, Mi, S, Ms}}). format_time({{Y, M, D}, {H, Mi, S, Ms}}) -> - io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]); -format_time({{Y, M, D}, {H, Mi, S}}) -> - io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]). + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]). format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) -> atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A); diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 0b5cbb56f..45e751193 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -278,7 +278,7 @@ to_map(#message{ }. %% @doc Message to tuple list --spec(to_list(emqx_types:message()) -> map()). +-spec(to_list(emqx_types:message()) -> list()). to_list(Msg) -> lists:zip(record_info(fields, message), tl(tuple_to_list(Msg))). @@ -290,8 +290,6 @@ format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, h io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). -format(_, undefined) -> - ""; format(flags, Flags) -> io_lib:format("~p", [[Flag || {Flag, true} <- maps:to_list(Flags)]]); format(headers, Headers) -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index d08d3ba35..88e3c91a6 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -62,11 +62,17 @@ maybe_apply(_Fun, undefined) -> undefined; maybe_apply(Fun, Arg) when is_function(Fun) -> erlang:apply(Fun, [Arg]). --spec(compose(list(F)) -> G when F :: fun((any()) -> any()), - G :: fun((any()) -> any())). +-spec(compose(list(F)) -> G + when F :: fun((any()) -> any()), + G :: fun((any()) -> any())). compose([F|More]) -> compose(F, More). --spec(compose(fun((X) -> Y), fun((Y) -> Z)) -> fun((X) -> Z)). +-spec(compose(F, G|[Gs]) -> C + when F :: fun((X1) -> X2), + G :: fun((X2) -> X3), + Gs :: [fun((Xn) -> Xn1)], + C :: fun((X1) -> Xm), + X3 :: any(), Xn :: any(), Xn1 :: any(), Xm :: any()). compose(F, G) when is_function(G) -> fun(X) -> G(F(X)) end; compose(F, [G]) -> compose(F, G); compose(F, [G|More]) -> compose(compose(F, G), More). diff --git a/src/emqx_mod_acl_internal.erl b/src/emqx_mod_acl_internal.erl index 0a4037a31..770f90c83 100644 --- a/src/emqx_mod_acl_internal.erl +++ b/src/emqx_mod_acl_internal.erl @@ -33,8 +33,6 @@ , description/0 ]). --define(MFA(M, F, A), {M, F, A}). - -type(acl_rules() :: #{publish => [emqx_access_rule:rule()], subscribe => [emqx_access_rule:rule()]}). @@ -44,11 +42,10 @@ load(_Env) -> Rules = rules_from_file(emqx:get_env(acl_file)), - emqx_hooks:add('client.check_acl', ?MFA(?MODULE, check_acl, [Rules]), -1). + emqx_hooks:add('client.check_acl', {?MODULE, check_acl, [Rules]}, -1). unload(_Env) -> - Rules = rules_from_file(emqx:get_env(acl_file)), - emqx_hooks:del('client.check_acl', ?MFA(?MODULE, check_acl, [Rules])). + emqx_hooks:del('client.check_acl', {?MODULE, check_acl}). reload(_Env) -> emqx_acl_cache:is_enabled() andalso ( diff --git a/src/emqx_mod_delayed.erl b/src/emqx_mod_delayed.erl index f9a1c4756..b4ea28e77 100644 --- a/src/emqx_mod_delayed.erl +++ b/src/emqx_mod_delayed.erl @@ -61,7 +61,7 @@ load(_Env) -> -spec(unload(list()) -> ok). unload(_Env) -> - emqx:unhook('message.publish', {?MODULE, on_message_publish, []}), + emqx:unhook('message.publish', {?MODULE, on_message_publish}), emqx_mod_sup:stop_child(?MODULE). description() -> @@ -83,21 +83,13 @@ on_message_publish(Msg = #message{id = Id, topic = <<"$delayed/", Topic/binary>> end end, PubMsg = Msg#message{topic = Topic1}, - Headers = case PubMsg#message.headers of - undefined -> #{}; - Headers0 -> Headers0 - end, - ok = store(#delayed_message{key = {PubAt, delayed_mid(Id)}, msg = PubMsg}), + Headers = PubMsg#message.headers, + ok = store(#delayed_message{key = {PubAt, Id}, msg = PubMsg}), {stop, PubMsg#message{headers = Headers#{allow_publish => false}}}; on_message_publish(Msg) -> {ok, Msg}. -%% @private -delayed_mid(undefined) -> - emqx_guid:gen(); -delayed_mid(MsgId) -> MsgId. - %%-------------------------------------------------------------------- %% Start delayed publish server %%-------------------------------------------------------------------- diff --git a/src/emqx_mod_topic_metrics.erl b/src/emqx_mod_topic_metrics.erl index 4e9b44fa8..66fac6ee4 100644 --- a/src/emqx_mod_topic_metrics.erl +++ b/src/emqx_mod_topic_metrics.erl @@ -97,14 +97,14 @@ load(_Env) -> emqx_mod_sup:start_child(?MODULE, worker), - emqx:hook('message.publish', fun ?MODULE:on_message_publish/1, []), - emqx:hook('message.dropped', fun ?MODULE:on_message_dropped/3, []), - emqx:hook('message.delivered', fun ?MODULE:on_message_delivered/2, []). + emqx:hook('message.publish', {?MODULE, on_message_publish, []}), + emqx:hook('message.dropped', {?MODULE, on_message_dropped, []}), + emqx:hook('message.delivered', {?MODULE, on_message_delivered, []}). unload(_Env) -> - emqx:unhook('message.publish', fun ?MODULE:on_message_publish/1), - emqx:unhook('message.dropped', fun ?MODULE:on_message_dropped/3), - emqx:unhook('message.delivered', fun ?MODULE:on_message_delivered/2), + emqx:unhook('message.publish', {?MODULE, on_message_publish}), + emqx:unhook('message.dropped', {?MODULE, on_message_dropped}), + emqx:unhook('message.delivered', {?MODULE, on_message_delivered}), emqx_mod_sup:stop_child(?MODULE). description() -> diff --git a/src/emqx_mqtt_caps.erl b/src/emqx_mqtt_caps.erl index 47f02edda..f29d59915 100644 --- a/src/emqx_mqtt_caps.erl +++ b/src/emqx_mqtt_caps.erl @@ -71,8 +71,9 @@ }). -spec(check_pub(emqx_types:zone(), - #{qos => emqx_types:qos(), - retain => boolean()}) + #{qos := emqx_types:qos(), + retain := boolean(), + topic := emqx_topic:topic()}) -> ok_or_error(emqx_types:reason_code())). check_pub(Zone, Flags) when is_map(Flags) -> do_check_pub(case maps:take(topic, Flags) of @@ -156,4 +157,4 @@ with_env(Zone, Key, InitFun) -> ok = emqx_zone:set_env(Zone, Key, Caps), Caps; ZoneCaps -> ZoneCaps - end. \ No newline at end of file + end. diff --git a/src/emqx_mqtt_props.erl b/src/emqx_mqtt_props.erl index 535c5b696..7acea6761 100644 --- a/src/emqx_mqtt_props.erl +++ b/src/emqx_mqtt_props.erl @@ -128,24 +128,21 @@ name(16#29) -> 'Subscription-Identifier-Available'; name(16#2A) -> 'Shared-Subscription-Available'; name(Id) -> error({unsupported_property, Id}). --spec(filter(emqx_types:packet_type(), emqx_types:properties()|list()) +-spec(filter(emqx_types:packet_type(), emqx_types:properties()) -> emqx_types:properties()). -filter(PacketType, Props) when is_map(Props) -> - maps:from_list(filter(PacketType, maps:to_list(Props))); - -filter(PacketType, Props) when ?CONNECT =< PacketType, - PacketType =< ?AUTH, - is_list(Props) -> - Filter = fun(Name) -> - case maps:find(id(Name), ?PROPS_TABLE) of - {ok, {Name, _Type, 'ALL'}} -> - true; - {ok, {Name, _Type, AllowedTypes}} -> - lists:member(PacketType, AllowedTypes); - error -> false - end - end, - [Prop || Prop = {Name, _} <- Props, Filter(Name)]. +filter(PacketType, Props) when is_map(Props), + PacketType >= ?CONNECT, + PacketType =< ?AUTH -> + F = fun(Name, _) -> + case maps:find(id(Name), ?PROPS_TABLE) of + {ok, {Name, _Type, 'ALL'}} -> + true; + {ok, {Name, _Type, AllowedTypes}} -> + lists:member(PacketType, AllowedTypes); + error -> false + end + end, + maps:filter(F, Props). -spec(validate(emqx_types:properties()) -> ok). validate(Props) when is_map(Props) -> diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index bef155e63..f77a1d98f 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -101,7 +101,7 @@ q = ?PQUEUE:new() :: pq() }). --opaque(mqueue() :: #mqueue{}). +-type(mqueue() :: #mqueue{}). -spec(init(options()) -> mqueue()). init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> diff --git a/src/emqx_os_mon.erl b/src/emqx_os_mon.erl index 0f6c7d597..6f10a7cfe 100644 --- a/src/emqx_os_mon.erl +++ b/src/emqx_os_mon.erl @@ -144,9 +144,6 @@ handle_info({timeout, Timer, check}, State = #{timer := Timer, NState = case emqx_vm:cpu_util() of %% TODO: should be improved? 0 -> State#{timer := undefined}; - {error, Reason} -> - ?LOG(error, "Failed to get cpu utilization: ~p", [Reason]), - ensure_check_timer(State); Busy when Busy / 100 >= CPUHighWatermark -> alarm_handler:set_alarm({cpu_high_watermark, Busy}), ensure_check_timer(State#{is_cpu_alarm_set := true}); diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 43fb4b51e..015f95e0d 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -484,10 +484,11 @@ format_variable(#mqtt_packet_suback{packet_id = PacketId, format_variable(#mqtt_packet_unsuback{packet_id = PacketId}) -> io_lib:format("PacketId=~p", [PacketId]); -format_variable(PacketId) when is_integer(PacketId) -> - io_lib:format("PacketId=~p", [PacketId]); +format_variable(#mqtt_packet_auth{reason_code = ReasonCode}) -> + io_lib:format("ReasonCode=~p", [ReasonCode]); -format_variable(undefined) -> undefined. +format_variable(PacketId) when is_integer(PacketId) -> + io_lib:format("PacketId=~p", [PacketId]). format_password(undefined) -> undefined; format_password(_Password) -> '******'. diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index eddad6810..faa6abae3 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -38,6 +38,11 @@ -compile(export_all). -compile(nowarn_export_all). -endif. + +-dialyzer({no_match, [ plugin_loaded/2 + , plugin_unloaded/2 + ]}). + %%-------------------------------------------------------------------- %% APIs %%-------------------------------------------------------------------- diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index ff6fec6b8..080cd842b 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -18,6 +18,8 @@ -behaviour(supervisor). +-include("types.hrl"). + -export([spec/1, spec/2]). -export([ start_link/0 @@ -46,12 +48,12 @@ spec(ChildId, Args) -> start_link() -> start_link(?POOL, random, {?POOL, start_link, []}). --spec(start_link(atom() | tuple(), atom(), mfa()) +-spec(start_link(atom() | tuple(), atom(), mfargs()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, MFA) -> start_link(Pool, Type, emqx_vm:schedulers(), MFA). --spec(start_link(atom() | tuple(), atom(), pos_integer(), mfa()) +-spec(start_link(atom() | tuple(), atom(), pos_integer(), mfargs()) -> {ok, pid()} | {error, term()}). start_link(Pool, Type, Size, MFA) -> supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]). diff --git a/src/emqx_router_helper.erl b/src/emqx_router_helper.erl index 2e587c446..ba5cc5543 100644 --- a/src/emqx_router_helper.erl +++ b/src/emqx_router_helper.erl @@ -53,6 +53,8 @@ -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). +-dialyzer({nowarn_function, [cleanup_routes/1]}). + %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 1e7a4dc7a..88664e028 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -123,7 +123,7 @@ created_at :: pos_integer() }). --opaque(session() :: #session{}). +-type(session() :: #session{}). -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}). @@ -152,6 +152,7 @@ -define(DEFAULT_BATCH_N, 1000). + %%-------------------------------------------------------------------- %% Init a Session %%-------------------------------------------------------------------- @@ -616,19 +617,16 @@ resume(ClientInfo = #{clientid := ClientId}, Session = #session{subscriptions = -spec(replay(session()) -> {ok, replies(), session()}). replay(Session = #session{inflight = Inflight}) -> - Pubs = replay(Inflight), + Pubs = lists:map(fun({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId}; + ({PacketId, {Msg, _Ts}}) -> + {PacketId, emqx_message:set_flag(dup, true, Msg)} + end, emqx_inflight:to_list(Inflight)), case dequeue(Session) of {ok, NSession} -> {ok, Pubs, NSession}; {ok, More, NSession} -> {ok, lists:append(Pubs, More), NSession} - end; - -replay(Inflight) -> - lists:map(fun({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId}; - ({PacketId, {Msg, _Ts}}) -> - {PacketId, emqx_message:set_flag(dup, true, Msg)} - end, emqx_inflight:to_list(Inflight)). + end. -spec(terminate(emqx_types:clientinfo(), Reason :: term(), session()) -> ok). terminate(ClientInfo, discarded, Session) -> diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 7548867e5..a6d53cfeb 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -58,7 +58,7 @@ -record(update, {name, countdown, interval, func}). -record(state, { - timer :: reference(), + timer :: maybe(reference()), updates :: [#update{}], tick_ms :: timeout() }). @@ -159,7 +159,7 @@ setstat(Stat, Val) when is_integer(Val) -> %% @doc Set stats with max value. -spec(setstat(Stat :: atom(), MaxStat :: atom(), - Val :: pos_integer()) -> boolean()). + Val :: pos_integer()) -> ok). setstat(Stat, MaxStat, Val) when is_integer(Val) -> cast({setstat, Stat, MaxStat, Val}). diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index e2698d06a..38387b3ce 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -19,6 +19,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("types.hrl"). -include("logger.hrl"). -logger_header("[SYS]"). @@ -53,20 +54,12 @@ -import(emqx_topic, [systop/1]). -import(emqx_misc, [start_timer/2]). --type(timeref() :: reference()). - --type(tickeref() :: reference()). - --type(version() :: string()). - --type(sysdescr() :: string()). - -record(state, { start_time :: erlang:timestamp() - , heartbeat :: timeref() - , ticker :: tickeref() - , version :: version() - , sysdescr :: sysdescr() + , heartbeat :: maybe(reference()) + , ticker :: maybe(reference()) + , version :: binary() + , sysdescr :: binary() }). -define(APP, emqx). diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index 8743c7dad..0eabb1253 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -18,8 +18,8 @@ -behavior(gen_server). --include("logger.hrl"). -include("types.hrl"). +-include("logger.hrl"). -logger_header("[SYSMON]"). @@ -171,9 +171,11 @@ handle_partition_event({partition, {healed, _Node}}) -> suppress(Key, SuccFun, State = #{events := Events}) -> case lists:member(Key, Events) of - true -> {noreply, State}; - false -> SuccFun(), - {noreply, State#{events := [Key|Events]}} + true -> + {noreply, State}; + false -> + SuccFun(), + {noreply, State#{events := [Key|Events]}} end. procinfo(Pid) -> diff --git a/src/emqx_topic.erl b/src/emqx_topic.erl index 487b04830..029a46b61 100644 --- a/src/emqx_topic.erl +++ b/src/emqx_topic.erl @@ -43,7 +43,7 @@ -type(topic() :: binary()). -type(word() :: '' | '+' | '#' | binary()). -type(words() :: list(word())). --opaque(triple() :: {root | binary(), word(), binary()}). +-type(triple() :: {root | binary(), word(), binary()}). -define(MAX_TOPIC_LEN, 4096). @@ -184,6 +184,7 @@ word(<<"#">>) -> '#'; word(Bin) -> Bin. %% @doc '$SYS' Topic. +-spec(systop(atom()|string()|binary()) -> topic()). systop(Name) when is_atom(Name); is_list(Name) -> iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name])); systop(Name) when is_binary(Name) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index a2112cda3..befb02b96 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -57,6 +57,8 @@ L =:= info orelse L =:= debug). +-dialyzer({nowarn_function, [install_trace_handler/3]}). + %%------------------------------------------------------------------------------ %% APIs %%------------------------------------------------------------------------------ @@ -68,7 +70,7 @@ trace(publish, #message{from = From, topic = Topic, payload = Payload}) emqx_logger:info(#{topic => Topic, mfa => {?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY} }, "PUBLISH to ~s: ~0p", [Topic, Payload]). %% @doc Start to trace clientid or topic. --spec(start_trace(trace_who(), logger:level(), string()) -> ok | {error, term()}). +-spec(start_trace(trace_who(), logger:level() | all, string()) -> ok | {error, term()}). start_trace(Who, all, LogFile) -> start_trace(Who, debug, LogFile); start_trace(Who, Level, LogFile) -> diff --git a/src/emqx_types.erl b/src/emqx_types.erl index 2475ad88b..718b757a5 100644 --- a/src/emqx_types.erl +++ b/src/emqx_types.erl @@ -136,7 +136,7 @@ is_bridge := boolean(), is_superuser := boolean(), mountpoint := maybe(binary()), - ws_cookie := maybe(list()), + ws_cookie => maybe(list()), password => maybe(binary()), auth_result => auth_result(), anonymous => boolean(), @@ -201,7 +201,7 @@ -type(caps() :: emqx_mqtt_caps:caps()). -type(attrs() :: #{atom() => term()}). -type(infos() :: #{atom() => term()}). --type(stats() :: #{atom() => non_neg_integer()|stats()}). +-type(stats() :: [{atom(), term()}]). -type(oom_policy() :: #{message_queue_len => non_neg_integer(), max_heap_size => non_neg_integer() diff --git a/src/emqx_vm.erl b/src/emqx_vm.erl index b4a36f410..539b8b91b 100644 --- a/src/emqx_vm.erl +++ b/src/emqx_vm.erl @@ -48,13 +48,13 @@ , get_port_info/1 ]). +-export([cpu_util/0]). + -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). -endif. --export([cpu_util/0]). - -define(UTIL_ALLOCATORS, [temp_alloc, eheap_alloc, binary_alloc, @@ -408,9 +408,6 @@ port_info(PortTerm, specific) -> [] end, {specific, Props}; -port_info(PortTerm, Keys) when is_list(Keys) -> - Port = transform_port(PortTerm), - [erlang:port_info(Port, Key) || Key <- Keys]; port_info(PortTerm, Key) when is_atom(Key) -> Port = transform_port(PortTerm), erlang:port_info(Port, Key). diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 1b09cf9e6..25eb8ff6c 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -63,7 +63,7 @@ %% Simulate the active_n opt active_n :: pos_integer(), %% Limiter - limiter :: emqx_limiter:limiter(), + limiter :: maybe(emqx_limiter:limiter()), %% Limit Timer limit_timer :: maybe(reference()), %% Parse State @@ -81,7 +81,7 @@ %% Idle Timeout idle_timeout :: timeout(), %% Idle Timer - idle_timer :: reference() + idle_timer :: maybe(reference()) }). -type(state() :: #state{}). @@ -95,6 +95,9 @@ -define(ENABLED(X), (X =/= undefined)). +-dialyzer({no_match, [info/2]}). +-dialyzer({nowarn_function, [websocket_init/1]}). + %%-------------------------------------------------------------------- %% Info, Stats %%-------------------------------------------------------------------- diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index 68befdfdc..cf3b14428 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -180,7 +180,7 @@ enable_flapping_detect(Zone) -> ignore_loop_deliver(Zone) -> get_env(Zone, ignore_loop_deliver, false). --spec(server_keepalive(zone()) -> pos_integer()). +-spec(server_keepalive(zone()) -> maybe(pos_integer())). server_keepalive(Zone) -> get_env(Zone, server_keepalive). diff --git a/test/emqx_SUITE.erl b/test/emqx_SUITE.erl index 24ef77753..48c2177e1 100644 --- a/test/emqx_SUITE.erl +++ b/test/emqx_SUITE.erl @@ -77,7 +77,10 @@ t_emqx_pubsub_api(_) -> ?assertEqual([self()], emqx:subscribers(Topic)), ?assertEqual([self()], emqx:subscribers(Topic1)), ?assertEqual([self()], emqx:subscribers(Topic2)), - ?assertEqual([{Topic,#{qos => 0,subid => ClientId}}, {Topic1,#{qos => 1,subid => ClientId}}, {Topic2,#{qos => 2,subid => ClientId}}], emqx:subscriptions(self())), + + ?assertEqual([{Topic, #{nl => 0, qos => 0, rap => 0, rh => 0, subid => ClientId}}, + {Topic1, #{nl => 0, qos => 1, rap => 0, rh => 0, subid => ClientId}}, + {Topic2, #{nl => 0, qos => 2, rap => 0, rh => 0, subid => ClientId}}], emqx:subscriptions(self())), ?assertEqual(true, emqx:subscribed(self(), Topic)), ?assertEqual(true, emqx:subscribed(ClientId, Topic)), ?assertEqual(true, emqx:subscribed(self(), Topic1)), diff --git a/test/emqx_broker_SUITE.erl b/test/emqx_broker_SUITE.erl index 2dd360759..d52132497 100644 --- a/test/emqx_broker_SUITE.erl +++ b/test/emqx_broker_SUITE.erl @@ -59,12 +59,18 @@ t_subopts(_) -> ?assertEqual(undefined, emqx_broker:get_subopts(<<"clientid">>, <<"topic">>)), emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), timer:sleep(200), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)), + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)), + emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}), - ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), - ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})), - ?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)), + ?assertEqual(#{nl => 0, qos => 2, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), + + ?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 0})), + ?assertEqual(#{nl => 0, qos => 0, rap => 0, rh => 0, subid => <<"clientid">>}, + emqx_broker:get_subopts(self(), <<"topic">>)), emqx_broker:unsubscribe(<<"topic">>). t_topics(_) -> @@ -91,9 +97,9 @@ t_subscribers(_) -> t_subscriptions(_) -> emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 1}), ok = timer:sleep(100), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, proplists:get_value(<<"topic">>, emqx_broker:subscriptions(self()))), - ?assertEqual(#{qos => 1, subid => <<"clientid">>}, + ?assertEqual(#{nl => 0, qos => 1, rap => 0, rh => 0, subid => <<"clientid">>}, proplists:get_value(<<"topic">>, emqx_broker:subscriptions(<<"clientid">>))), emqx_broker:unsubscribe(<<"topic">>). diff --git a/test/emqx_vm_SUITE.erl b/test/emqx_vm_SUITE.erl index acc75e99a..19545f0ea 100644 --- a/test/emqx_vm_SUITE.erl +++ b/test/emqx_vm_SUITE.erl @@ -80,8 +80,7 @@ t_get_port_info(_Config) -> {ok, Sock} = gen_tcp:connect("localhost", 5678, [binary, {packet, 0}]), emqx_vm:get_port_info(), ok = gen_tcp:close(Sock), - [Port | _] = erlang:ports(), - [{connected, _}, {name, _}] = emqx_vm:port_info(Port, [connected, name]). + [Port | _] = erlang:ports(). t_transform_port(_Config) -> [Port | _] = erlang:ports(),