From a2823f2ad6310b424202b7e07545be69e28767da Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 16 Dec 2021 14:53:04 +0800 Subject: [PATCH] chore(gw): improve some type specs --- .../src/coap/emqx_coap_channel.erl | 99 ++++++++++++++----- .../emqx_gateway/src/coap/emqx_coap_frame.erl | 13 ++- .../src/coap/emqx_coap_session.erl | 3 +- apps/emqx_gateway/src/coap/emqx_coap_tm.erl | 7 +- .../src/coap/emqx_coap_transport.erl | 7 +- .../coap/handler/emqx_coap_pubsub_handler.erl | 2 +- .../src/coap/include/emqx_coap.hrl | 2 +- .../src/emqx_gateway_api_authn.erl | 4 +- .../src/emqx_gateway_api_clients.erl | 12 +++ apps/emqx_gateway/src/emqx_gateway_http.erl | 21 ++-- apps/emqx_gateway/src/emqx_gateway_schema.erl | 4 +- .../src/lwm2m/emqx_lwm2m_channel.erl | 16 +++ .../src/lwm2m/emqx_lwm2m_session.erl | 6 +- .../src/stomp/emqx_stomp_channel.erl | 14 +-- 14 files changed, 147 insertions(+), 63 deletions(-) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl index e1dad1dee..5ccd0f52b 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_channel.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_channel.erl @@ -25,7 +25,8 @@ , validator/4 , metrics_inc/2 , run_hooks/3 - , send_request/2]). + , send_request/2 + ]). -export([ init/2 , handle_in/2 @@ -48,40 +49,55 @@ -define(AUTHN, ?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM). -record(channel, { - %% Context - ctx :: emqx_gateway_ctx:context(), - %% Connection Info - conninfo :: emqx_types:conninfo(), - %% Client Info - clientinfo :: emqx_types:clientinfo(), - %% Session - session :: emqx_coap_session:session() | undefined, - %% Keepalive - keepalive :: emqx_keepalive:keepalive() | undefined, - %% Timer - timers :: #{atom() => disable | undefined | reference()}, - - connection_required :: boolean(), - - conn_state :: idle | connected | disconnected, - - token :: binary() | undefined - }). + %% Context + ctx :: emqx_gateway_ctx:context(), + %% Connection Info + conninfo :: emqx_types:conninfo(), + %% Client Info + clientinfo :: emqx_types:clientinfo(), + %% Session + session :: emqx_coap_session:session() | undefined, + %% Keepalive + keepalive :: emqx_keepalive:keepalive() | undefined, + %% Timer + timers :: #{atom() => disable | undefined | reference()}, + %% Connection mode + connection_required :: boolean(), + %% Connection State + conn_state :: idle | connected | disconnected, + %% Session token to identity this connection + token :: binary() | undefined + }). -type channel() :: #channel{}. + +-type conn_state() :: idle | connecting | connected | disconnected. + +-type reply() :: {outgoing, coap_message()} + | {outgoing, [coap_message()]} + | {event, conn_state()|updated} + | {close, Reason :: atom()}. + +-type replies() :: reply() | [reply()]. + -define(TOKEN_MAXIMUM, 4294967295). + -define(INFO_KEYS, [conninfo, conn_state, clientinfo, session]). + -define(DEF_IDLE_TIME, timer:seconds(30)). -define(GET_IDLE_TIME(Cfg), maps:get(idle_timeout, Cfg, ?DEF_IDLE_TIME)). -import(emqx_coap_medium, [reply/2, reply/3, reply/4, iter/3, iter/4]). + %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- +-spec info(channel()) -> emqx_types:infos(). info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). +-spec info(list(atom())|atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; @@ -92,15 +108,17 @@ info(conn_state, #channel{conn_state = CState}) -> info(clientinfo, #channel{clientinfo = ClientInfo}) -> ClientInfo; info(session, #channel{session = Session}) -> - emqx_misc:maybe_apply(fun emqx_session:info/1, Session); + emqx_misc:maybe_apply(fun emqx_coap_session:info/1, Session); info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> ClientId; info(ctx, #channel{ctx = Ctx}) -> Ctx. +-spec stats(channel()) -> emqx_types:stats(). stats(_) -> []. +-spec init(map(), map()) -> channel(). init(ConnInfoT = #{peername := {PeerHost, _}, sockname := {_, SockPort}}, #{ctx := Ctx} = Config) -> @@ -126,8 +144,8 @@ init(ConnInfoT = #{peername := {PeerHost, _}, } ), - %% because it is possible to disconnect after init, and then trigger the $event.disconnected hook - %% and these two fields are required in the hook + %% because it is possible to disconnect after init, and then trigger the + %% $event.disconnected hook and these two fields are required in the hook ConnInfo = ConnInfoT#{proto_name => <<"CoAP">>, proto_ver => <<"1">>}, Heartbeat = ?GET_IDLE_TIME(Config), @@ -144,13 +162,19 @@ init(ConnInfoT = #{peername := {PeerHost, _}, validator(Type, Topic, Ctx, ClientInfo) -> emqx_gateway_ctx:authorize(Ctx, ClientInfo, Type, Topic). --spec send_request(pid(), emqx_coap_message()) -> any(). +-spec send_request(pid(), coap_message()) -> any(). send_request(Channel, Request) -> gen_server:send_request(Channel, {?FUNCTION_NAME, Request}). %%-------------------------------------------------------------------- %% Handle incoming packet %%-------------------------------------------------------------------- + +-spec handle_in(coap_message() | {frame_error, any()}, channel()) + -> {ok, channel()} + | {ok, replies(), channel()} + | {shutdown, Reason :: term(), channel()} + | {shutdown, Reason :: term(), replies(), channel()}. handle_in(Msg, ChannleT) -> Channel = ensure_keepalive_timer(ChannleT), case emqx_coap_message:is_request(Msg) of @@ -170,6 +194,7 @@ handle_deliver(Delivers, #channel{session = Session, %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- + handle_timeout(_, {keepalive, NewVal}, #channel{keepalive = KeepAlive} = Channel) -> case emqx_keepalive:check(NewVal, KeepAlive) of {ok, NewKeepAlive} -> @@ -191,10 +216,28 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- + +-spec(handle_call(Req :: term(), From :: term(), channel()) + -> {reply, Reply :: term(), channel()} + | {reply, Reply :: term(), replies(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), channel()} + | {shutdown, Reason :: term(), Reply :: term(), coap_message(), channel()}). handle_call({send_request, Msg}, From, Channel) -> Result = call_session(handle_out, {{send_request, From}, Msg}, Channel), erlang:setelement(1, Result, noreply); +handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> + {reply, {error, nosupport}, Channel}; + +handle_call({unsubscribe, _Topic}, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call(subscriptions, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call(kick, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + handle_call(Req, _From, Channel) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, Channel}. @@ -202,6 +245,9 @@ handle_call(Req, _From, Channel) -> %%-------------------------------------------------------------------- %% Handle Cast %%-------------------------------------------------------------------- + +-spec handle_cast(Req :: term(), channel()) + -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}. handle_cast(Req, Channel) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Req}), {ok, Channel}. @@ -209,9 +255,9 @@ handle_cast(Req, Channel) -> %%-------------------------------------------------------------------- %% Handle Info %%-------------------------------------------------------------------- -handle_info({subscribe, _}, Channel) -> - {ok, Channel}; +-spec(handle_info(Info :: term(), channel()) + -> ok | {ok, channel()} | {shutdown, Reason :: term(), channel()}). handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. @@ -412,6 +458,7 @@ ensure_disconnected(Reason, Channel = #channel{ %%-------------------------------------------------------------------- %% Call Chain %%-------------------------------------------------------------------- + call_session(Fun, Msg, #channel{session = Session} = Channel) -> Result = emqx_coap_session:Fun(Msg, Session), handle_result(Result, Channel). diff --git a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl index e5305b417..a567ea7f4 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_frame.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_frame.erl @@ -18,17 +18,15 @@ -behaviour(emqx_gateway_frame). -%% emqx_gateway_frame API +%% emqx_gateway_frame callbacks -export([ initial_parse_state/1 , serialize_opts/0 , serialize_pkt/2 , parse/2 , format/1 , type/1 - , is_message/1]). - -%% API --export([]). + , is_message/1 + ]). -include("include/emqx_coap.hrl"). -include("apps/emqx/include/types.hrl"). @@ -58,9 +56,11 @@ %% API %%-------------------------------------------------------------------- +-spec initial_parse_state(map()) -> emqx_gateway_frame:parse_state(). initial_parse_state(_) -> #{}. +-spec serialize_opts() -> emqx_gateway_frame:serialize_options(). serialize_opts() -> #{}. @@ -235,6 +235,9 @@ method_to_class_code(Method) -> %%-------------------------------------------------------------------- %% parse %%-------------------------------------------------------------------- + +-spec parse(binary(), emqx_gateway_frame:parse_state()) + -> emqx_gateway_frame:parse_result(). parse(<>, ParseState) -> {ok, #coap_message{ type = decode_type(Type) diff --git a/apps/emqx_gateway/src/coap/emqx_coap_session.erl b/apps/emqx_gateway/src/coap/emqx_coap_session.erl index 13f1be240..f94b4e9b3 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_session.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_session.erl @@ -22,7 +22,8 @@ %% API -export([ new/0 - , process_subscribe/4]). + , process_subscribe/4 + ]). -export([ info/1 , info/2 diff --git a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl index b5e4deb7f..d6d05fd40 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_tm.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_tm.erl @@ -63,7 +63,7 @@ -type event_result(State) :: #{next => State, - outgoing => emqx_coap_message(), + outgoing => coap_message(), timeouts => list(ttimeout()), has_sub => undefined | sub_register(), transport => emqx_coap_transport:transprot()}. @@ -75,12 +75,13 @@ %%-------------------------------------------------------------------- %% API %%-------------------------------------------------------------------- + +-spec new() -> manager(). new() -> #{ seq_id => 1 , next_msg_id => rand:uniform(?MAX_MESSAGE_ID) }. -%% client request handle_request(#coap_message{id = MsgId} = Msg, TM) -> Id = {in, MsgId}, case find_machine(Id, TM) of @@ -296,7 +297,7 @@ new_in_machine(MachineId, #{seq_id := SeqId} = Manager) -> SeqId => Machine, MachineId => SeqId}}. --spec new_out_machine(state_machine_key(), any(), emqx_coap_message(), manager()) -> +-spec new_out_machine(state_machine_key(), any(), coap_message(), manager()) -> {state_machine(), manager()}. new_out_machine(MachineId, Ctx, diff --git a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl index 2e858a2e1..07e522309 100644 --- a/apps/emqx_gateway/src/coap/emqx_coap_transport.erl +++ b/apps/emqx_gateway/src/coap/emqx_coap_transport.erl @@ -11,7 +11,7 @@ -type request_context() :: any(). --record(transport, { cache :: undefined | emqx_coap_message() +-record(transport, { cache :: undefined | coap_message() , req_context :: request_context() , retry_interval :: non_neg_integer() , retry_count :: non_neg_integer() @@ -26,7 +26,6 @@ -export_type([transport/0]). --import(emqx_coap_message, [reset/1]). -import(emqx_coap_medium, [ empty/0, reset/2, proto_out/2 , out/1, out/2, proto_out/1 , reply/2]). @@ -166,7 +165,7 @@ observe(in, {error, _} -> #{next => stop}; _ -> - reset(Message) + emqx_coap_message:reset(Message) end. until_stop(_, _, _) -> @@ -187,5 +186,5 @@ on_response(#coap_message{type = Type} = Message, out(Ack, #{next => NextState, transport => Transport#transport{cache = Ack}})); true -> - reset(Message) + emqx_coap_message:reset(Message) end. diff --git a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl index 76d065523..a5ee55b16 100644 --- a/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl +++ b/apps/emqx_gateway/src/coap/handler/emqx_coap_pubsub_handler.erl @@ -28,7 +28,7 @@ -define(UNSUB(Topic, Msg), #{subscribe => {Topic, Msg}}). -define(SUB(Topic, Token, Msg), #{subscribe => {{Topic, Token}, Msg}}). --define(SUBOPTS, #{qos => 0, rh => 0, rap => 0, nl => 0, is_new => false}). +-define(SUBOPTS, #{qos => 0, rh => 1, rap => 0, nl => 0, is_new => false}). %% TODO maybe can merge this code into emqx_coap_session, simplify the call chain diff --git a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl index d47dd17fd..3ec37cc4a 100644 --- a/apps/emqx_gateway/src/coap/include/emqx_coap.hrl +++ b/apps/emqx_gateway/src/coap/include/emqx_coap.hrl @@ -73,4 +73,4 @@ , options = #{} , payload = <<>>}). --type emqx_coap_message() :: #coap_message{}. +-type coap_message() :: #coap_message{}. diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl index 701890633..aa56e6fdd 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn.erl @@ -194,8 +194,8 @@ schema("/gateway/:name/authentication/users") -> , responses => ?STANDARD_RESP( #{ 200 => emqx_dashboard_swagger:schema_with_example( - ref(emqx_authn_api, response_user), - emqx_authn_api:response_user_examples()) + ref(emqx_authn_api, response_users), + emqx_authn_api:response_users_example()) }) }, post => diff --git a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl index 9fe36d25e..be3b81bdd 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_clients.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_clients.erl @@ -148,6 +148,10 @@ subscriptions(get, #{ bindings := #{name := Name0, ClientId = emqx_mgmt_util:urldecode(ClientId0), with_gateway(Name0, fun(GwName, _) -> case emqx_gateway_http:list_client_subscriptions(GwName, ClientId) of + {error, nosupport} -> + return_http_error(405, <<"Not support to list subscriptions">>); + {error, noimpl} -> + return_http_error(501, <<"Not implemented now">>); {error, Reason} -> return_http_error(500, Reason); {ok, Subs} -> @@ -168,6 +172,14 @@ subscriptions(post, #{ bindings := #{name := Name0, {Topic, QoS} -> case emqx_gateway_http:client_subscribe( GwName, ClientId, Topic, QoS) of + {error, nosupport} -> + return_http_error( + 405, + <<"Not support to add a subscription">>); + {error, noimpl} -> + return_http_error( + 501, + <<"Not implemented now">>); {error, Reason} -> return_http_error(404, Reason); ok -> diff --git a/apps/emqx_gateway/src/emqx_gateway_http.erl b/apps/emqx_gateway/src/emqx_gateway_http.erl index 434a0bc49..6e97865a0 100644 --- a/apps/emqx_gateway/src/emqx_gateway_http.erl +++ b/apps/emqx_gateway/src/emqx_gateway_http.erl @@ -274,15 +274,18 @@ kickout_client(Node, GwName, ClientId) -> -> {error, any()} | {ok, list()}. list_client_subscriptions(GwName, ClientId) -> - %% Get the subscriptions from session-info with_channel(GwName, ClientId, fun(Pid) -> - Subs = emqx_gateway_conn:call( - Pid, - subscriptions, ?DEFAULT_CALL_TIMEOUT), - {ok, lists:map(fun({Topic, SubOpts}) -> - SubOpts#{topic => Topic} - end, Subs)} + case emqx_gateway_conn:call( + Pid, + subscriptions, ?DEFAULT_CALL_TIMEOUT) of + {ok, Subs} -> + {ok, lists:map(fun({Topic, SubOpts}) -> + SubOpts#{topic => Topic} + end, Subs)}; + {error, Reason} -> + {error, Reason} + end end). -spec client_subscribe(gateway_name(), emqx_type:clientid(), @@ -330,7 +333,9 @@ return_http_error(Code, Msg) -> codestr(400) -> 'BAD_REQUEST'; codestr(401) -> 'NOT_SUPPORTED_NOW'; codestr(404) -> 'RESOURCE_NOT_FOUND'; -codestr(500) -> 'UNKNOW_ERROR'. +codestr(405) -> 'METHOD_NOT_ALLOWED'; +codestr(500) -> 'UNKNOW_ERROR'; +codestr(501) -> 'NOT_IMPLEMENTED'. -spec with_authn(binary(), function()) -> any(). with_authn(GwName0, Fun) -> diff --git a/apps/emqx_gateway/src/emqx_gateway_schema.erl b/apps/emqx_gateway/src/emqx_gateway_schema.erl index 18b195c5b..dfdf6ea2a 100644 --- a/apps/emqx_gateway/src/emqx_gateway_schema.erl +++ b/apps/emqx_gateway/src/emqx_gateway_schema.erl @@ -449,7 +449,7 @@ it has two purposes: sc(ref(clientinfo_override), #{ desc => "" })} - , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()} + , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()} ]. common_listener_opts() -> @@ -468,7 +468,7 @@ common_listener_opts() -> sc(integer(), #{ default => 1000 })} - , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication_schema()} + , {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM, authentication_schema()} , {mountpoint, sc(binary(), #{ default => undefined diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl index f3c64d678..5919701cb 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_channel.erl @@ -174,6 +174,7 @@ handle_timeout(_, _, Channel) -> %%-------------------------------------------------------------------- %% Handle call %%-------------------------------------------------------------------- + handle_call({lookup_cmd, Path, Type}, _From, #channel{session = Session} = Channel) -> Result = emqx_lwm2m_session:find_cmd_record(Path, Type, Session), {reply, {ok, Result}, Channel}; @@ -182,6 +183,21 @@ handle_call({send_cmd, Cmd}, _From, Channel) -> {ok, Outs, Channel2} = call_session(send_cmd, Cmd, Channel), {reply, ok, Outs, Channel2}; +handle_call({subscribe, _Topic, _SubOpts}, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call({unsubscribe, _Topic}, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call(subscriptions, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call(kick, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + +handle_call(discard, _From, Channel) -> + {reply, {error, noimpl}, Channel}; + handle_call(Req, _From, Channel) -> ?SLOG(error, #{ msg => "unexpected_call" , call => Req diff --git a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl index 6def4874a..3cf8fde78 100644 --- a/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl +++ b/apps/emqx_gateway/src/lwm2m/emqx_lwm2m_session.erl @@ -42,7 +42,7 @@ -type request_context() :: map(). -type timestamp() :: non_neg_integer(). --type queued_request() :: {timestamp(), request_context(), emqx_coap_message()}. +-type queued_request() :: {timestamp(), request_context(), coap_message()}. -type cmd_path() :: binary(). -type cmd_type() :: binary(). @@ -120,7 +120,7 @@ new() -> , cmd_record = #{queue => queue:new()} , lifetime = emqx:get_config([gateway, lwm2m, lifetime_max])}. --spec init(emqx_coap_message(), binary(), function(), session()) -> map(). +-spec init(coap_message(), binary(), function(), session()) -> map(). init(#coap_message{options = Opts, payload = Payload} = Msg, MountPoint, WithContext, Session) -> Query = maps:get(uri_query, Opts), @@ -347,7 +347,7 @@ get_lifetime(#{<<"lt">> := _} = NewRegInfo, _) -> get_lifetime(_, OldRegInfo) -> get_lifetime(OldRegInfo). --spec update(emqx_coap_message(), function(), binary(), session()) -> map(). +-spec update(coap_message(), function(), binary(), session()) -> map(). update(#coap_message{options = Opts, payload = Payload} = Msg, WithContext, CmdType, diff --git a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl index c6570adfc..4c5d84caa 100644 --- a/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl +++ b/apps/emqx_gateway/src/stomp/emqx_stomp_channel.erl @@ -73,16 +73,16 @@ transaction :: #{binary() => list()} }). --type(channel() :: #channel{}). +-type channel() :: #channel{}. --type(conn_state() :: idle | connecting | connected | disconnected). +-type conn_state() :: idle | connecting | connected | disconnected. --type(reply() :: {outgoing, stomp_frame()} +-type reply() :: {outgoing, stomp_frame()} | {outgoing, [stomp_frame()]} | {event, conn_state()|updated} - | {close, Reason :: atom()}). + | {close, Reason :: atom()}. --type(replies() :: reply() | [reply()]). +-type replies() :: reply() | [reply()]. -define(TIMER_TABLE, #{ incoming_timer => keepalive, @@ -155,7 +155,7 @@ setting_peercert_infos(Peercert, ClientInfo) -> info(Channel) -> maps:from_list(info(?INFO_KEYS, Channel)). --spec(info(list(atom())|atom(), channel()) -> term()). +-spec info(list(atom())|atom(), channel()) -> term(). info(Keys, Channel) when is_list(Keys) -> [{Key, info(Key, Channel)} || Key <- Keys]; @@ -174,7 +174,7 @@ info(clientid, #channel{clientinfo = #{clientid := ClientId}}) -> info(ctx, #channel{ctx = Ctx}) -> Ctx. --spec(stats(channel()) -> emqx_types:stats()). +-spec stats(channel()) -> emqx_types:stats(). stats(#channel{subscriptions = Subs}) -> [{subscriptions_cnt, length(Subs)}].