From ea1ae708335ab5347ec885a409cc674196c6749f Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 31 Aug 2018 16:46:51 +0800 Subject: [PATCH] Fix errors found by dialyzer --- src/emqx.erl | 64 ++++++++---------- src/emqx_acl_internal.erl | 4 +- src/emqx_alarm_mgr.erl | 31 ++++----- src/emqx_bridge.erl | 5 +- src/emqx_broker.erl | 4 +- src/emqx_client.erl | 2 +- src/emqx_connection.erl | 31 +++++---- src/emqx_local_bridge.erl | 14 ++-- src/emqx_metrics.erl | 10 ++- src/emqx_mod_rewrite.erl | 4 +- src/emqx_protocol.erl | 7 +- src/emqx_session.erl | 5 +- src/emqx_sm.erl | 47 ++++++------- src/emqx_sm_registry.erl | 34 +++++----- src/emqx_stats.erl | 3 +- src/emqx_ws_connection.erl | 131 +++++++++++++++++++++---------------- 16 files changed, 200 insertions(+), 196 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 8e1f10168..217611171 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -66,41 +66,36 @@ is_running(Node) -> %% PubSub API %%-------------------------------------------------------------------- --spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). +-spec(subscribe(emqx_topic:topic() | string()) -> ok). subscribe(Topic) -> emqx_broker:subscribe(iolist_to_binary(Topic)). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) - -> ok | {error, term()}). -subscribe(Topic, Sub) when is_list(Sub)-> - emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub)); -subscribe(Topic, Subscriber) when is_tuple(Subscriber) -> - {SubPid, SubId} = Subscriber, - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). +subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubId); +subscribe(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:subscribe(iolist_to_binary(Topic), SubPid). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(), - emqx_topic:subopts()) -> ok | {error, term()}). -subscribe(Topic, Sub, Options) when is_list(Sub)-> - emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options); -subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> - {SubPid, SubId} = Subscriber, - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(), + emqx_types:subopts()) -> ok). +subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options); +subscribe(Topic, SubPid, Options) when is_pid(SubPid)-> + emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options). -spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). publish(Msg) -> emqx_broker:publish(Msg). --spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). +-spec(unsubscribe(emqx_topic:topic() | string()) -> ok). unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) - -> ok | {error, term()}). -unsubscribe(Topic, Sub) when is_list(Sub) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub)); -unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> - {SubPid, SubId} = Subscriber, - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId). +-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). +unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId); +unsubscribe(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid). %%-------------------------------------------------------------------- %% PubSub management API @@ -109,12 +104,12 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) -> emqx_types:subopts()). get_subopts(Topic, Subscriber) -> - emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)). + emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber). -spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(), - emqx_types:subopts()) -> ok). -set_subopts(Topic, Subscriber, Options) when is_list(Options) -> - emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). + emqx_types:subopts()) -> boolean()). +set_subopts(Topic, Subscriber, Options) when is_map(Options) -> + emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options). -spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). @@ -127,16 +122,11 @@ subscribers(Topic) -> subscriptions(Subscriber) -> emqx_broker:subscriptions(Subscriber). --spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()). -subscribed(Topic, Subscriber) -> - emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)). - -list_to_subid(SubId) when is_binary(SubId) -> - SubId; -list_to_subid(SubId) when is_list(SubId) -> - iolist_to_binary(SubId); -list_to_subid(SubPid) when is_pid(SubPid) -> - SubPid. +-spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()). +subscribed(Topic, SubPid) when is_pid(SubPid) -> + emqx_broker:subscribed(iolist_to_binary(Topic), SubPid); +subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> + emqx_broker:subscribed(iolist_to_binary(Topic), SubId). %%-------------------------------------------------------------------- %% Hooks API diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index 0f25e6808..eee7e6c18 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -25,6 +25,8 @@ -define(ACL_RULE_TAB, emqx_acl_rule). +-type(state() :: #{acl_file := string()}). + %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ @@ -95,7 +97,7 @@ match(Credentials, Topic, [Rule|Rules]) -> {matched, AllowDeny} end. --spec(reload_acl(#{}) -> ok | {error, term()}). +-spec(reload_acl(state()) -> ok | {error, term()}). reload_acl(#{acl_file := AclFile}) -> case catch load_rules_from_file(AclFile) of true -> diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl index bb734c8e6..fd2a42aa7 100644 --- a/src/emqx_alarm_mgr.erl +++ b/src/emqx_alarm_mgr.erl @@ -28,10 +28,11 @@ -define(ALARM_MGR, ?MODULE). --record(state, {alarms}). - start_link() -> - start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end). + start_with( + fun(Pid) -> + gen_event:add_handler(Pid, ?MODULE, []) + end). start_with(Fun) -> case gen_event:start_link({local, ?ALARM_MGR}) of @@ -73,42 +74,42 @@ delete_alarm_handler(Module) when is_atom(Module) -> %% Default Alarm handler %%------------------------------------------------------------------------------ -init(_) -> {ok, #state{alarms = []}}. +init(_) -> {ok, #{alarms => []}}. handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)-> handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State); -handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) -> +handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) -> case encode_alarm(Alarm) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); {error, Reason} -> emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) end, - {ok, State#state{alarms = [Alarm|Alarms]}}; + {ok, State#{alarms := [Alarm|Alarms]}}; -handle_event({clear_alarm, AlarmId}, State = #state{alarms = Alarms}) -> - case emqx_json:safe_encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of +handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) -> + case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of {ok, Json} -> emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json)); {error, Reason} -> emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason]) end, - {ok, State#state{alarms = lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; + {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; handle_event(Event, State)-> - error_logger:error("[AlarmMgr] unexpected event: ~p", [Event]), + emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]), {ok, State}. handle_info(Info, State) -> - error_logger:error("[AlarmMgr] unexpected info: ~p", [Info]), + emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]), {ok, State}. -handle_call(get_alarms, State = #state{alarms = Alarms}) -> +handle_call(get_alarms, State = #{alarms := Alarms}) -> {ok, Alarms, State}; handle_call(Req, State) -> - error_logger:error("[AlarmMgr] unexpected call: ~p", [Req]), + emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]), {ok, ignored, State}. terminate(swap, State) -> @@ -132,8 +133,8 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title, alarm_msg(Type, AlarmId, Json) -> Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json), - emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, - emqx_message:set_flags(#{sys => true}, Msg)). + emqx_message:set_headers( #{'Content-Type' => <<"application/json">>}, + emqx_message:set_flag(sys, Msg)). topic(alert, AlarmId) -> emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index d1763d31c..0f64f331d 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -133,7 +133,8 @@ handle_info(start, State = #state{options = Options, Subs = get_value(subscriptions, Options, []), [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), - [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], + [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, + emqx_topic:validate({filter, i2b(Topic)})], {noreply, State#state{client_pid = ClientPid}}; {error,_} -> erlang:send_after(ReconnectTime, self(), start), @@ -251,4 +252,4 @@ store(disk, Data, Queue, _MaxPendingMsg)-> delete(memory, PkgId, Queue) -> lists:keydelete(PkgId, 1, Queue); delete(disk, PkgId, Queue) -> - lists:keydelete(PkgId, 1, Queue). \ No newline at end of file + lists:keydelete(PkgId, 1, Queue). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 623290961..b2f1bb119 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -260,9 +260,9 @@ subscription(Topic, Subscriber) -> -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1; + length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1; subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) == 1; + length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1; subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 5c50519bc..192569ca4 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -186,7 +186,7 @@ with_owner(Options) -> connect(Client) -> gen_statem:call(Client, connect, infinity). --spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]}) +-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}]) -> subscribe_ret()). subscribe(Client, Topic) when is_binary(Topic) -> subscribe(Client, {Topic, ?QOS_0}); diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index bcaea297d..adda71450 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -202,20 +202,19 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} -> {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; {error, Reason} -> - shutdown(Reason, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + shutdown(Reason, State) end; -handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> +handle_info({timeout, Timer, emit_stats}, + State = #state{stats_timer = Timer, proto_state = ProtoState}) -> emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info(timeout, State) -> shutdown(idle_timeout, State); -handle_info({shutdown, Error}, State) -> - shutdown(Error, State); +handle_info({shutdown, Reason}, State) -> + shutdown(Reason, State); handle_info({shutdown, discard, {ClientId, ByPid}}, State) -> ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State), @@ -311,13 +310,13 @@ handle_packet(Data, State = #state{proto_state = ProtoState, {ok, ProtoState1} -> NewState = State#state{proto_state = ProtoState1}, handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); - {error, Error} -> - ?LOG(error, "Protocol error - ~p", [Error], State), - shutdown(Error, State); - {error, Error, ProtoState1} -> - shutdown(Error, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - stop(Reason, State#state{proto_state = ProtoState1}) + {error, Reason} -> + ?LOG(error, "Process packet error - ~p", [Reason], State), + shutdown(Reason, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}); + {stop, Error, ProtoState1} -> + stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?LOG(error, "Framing error - ~p", [Error], State), @@ -371,9 +370,9 @@ run_socket(State = #state{transport = Transport, socket = Socket}) -> %%------------------------------------------------------------------------------ ensure_stats_timer(State = #state{enable_stats = true, - stats_timer = undefined, - idle_timeout = IdleTimeout}) -> - State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)}; + stats_timer = undefined, + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. shutdown(Reason, State) -> diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 66cdf4010..228a64cff 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -60,8 +60,8 @@ init([Pool, Id, Node, Topic, Options]) -> case net_kernel:connect_node(Node) of true -> true = erlang:monitor_node(Node, true), - Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), + Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), + emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqx_mqueue:init(#{type => simple, max_len => State#state.max_queue_len, @@ -86,11 +86,6 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) -> parse_opts([_Opt | Opts], State) -> parse_opts(Opts, State). -qname(Node, Topic) when is_atom(Node) -> - qname(atom_to_list(Node), Topic); -qname(Node, Topic) -> - iolist_to_binary(["Bridge:", Node, ":", Topic]). - handle_call(Req, _From, State) -> emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), {reply, ignored, State}. @@ -104,7 +99,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> - ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), + emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), {noreply, State}; handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> @@ -157,7 +152,6 @@ dequeue(State = #state{mqueue = MQ}) -> dequeue(State#state{mqueue = MQ1}) end. -transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, - topic_suffix = Suffix}) -> +transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) -> Msg#message{topic = <>}. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 0a9ad67aa..e319a425b 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -26,8 +26,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {}). - %% Bytes sent and received of Broker -define(BYTES_METRICS, [ {counter, 'bytes/received'}, % Total bytes received @@ -85,8 +83,8 @@ -define(TAB, ?MODULE). -define(SERVER, ?MODULE). -%% @doc Start the metrics server --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start the metrics server. +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -252,7 +250,7 @@ init([]) -> % Create metrics table _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), - {ok, #state{}, hibernate}. + {ok, #{}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[Metrics] unexpected call: ~p", [Req]), @@ -266,7 +264,7 @@ handle_info(Info, State) -> emqx_logger:error("[Metrics] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{}) -> +terminate(_Reason, #{}) -> ok. code_change(_OldVsn, State, _Extra) -> diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 2a92793eb..a9ff334ce 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -23,8 +23,8 @@ load(Rules0) -> Rules = compile(Rules0), - emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), - emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), + emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]), + emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]), emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). rewrite_subscribe(_Credentials, TopicTable, Rules) -> diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 01fbce313..018166d20 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -188,14 +188,14 @@ session(#pstate{session = SPid}) -> SPid. parser(#pstate{packet_size = Size, proto_ver = Ver}) -> - emqx_frame:initial_state(#{packet_size => Size, version => Ver}). + emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}). %%------------------------------------------------------------------------------ %% Packet Received %%------------------------------------------------------------------------------ --spec(received(emqx_mqtt_types:packet(), state()) - -> {ok, state()} | {error, term()} | {error, term(), state()}). +-spec(received(emqx_mqtt_types:packet(), state()) -> + {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}). received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> {error, proto_not_connected, PState}; @@ -451,6 +451,7 @@ puback(?QOS_2, PacketId, {ok, _}, PState) -> %% Deliver Packet -> Client %%------------------------------------------------------------------------------ +-spec(deliver(term(), state()) -> {ok, state()} | {error, term()}). deliver({connack, ReasonCode}, PState) -> send(?CONNACK_PACKET(ReasonCode), PState); diff --git a/src/emqx_session.erl b/src/emqx_session.erl index d5b68a1f6..81299711d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -148,6 +148,9 @@ }). -type(spid() :: pid()). +-type(attr() :: {atom(), term()}). + +-export_type([attr/0]). -define(TIMEOUT, 60000). @@ -564,7 +567,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> - true = emqx_sm:set_session_stats(ClientId, stats(State)), + _ = emqx_sm:set_session_stats(ClientId, stats(State)), {noreply, State#state{stats_timer = undefined}, hibernate}; handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 0b188f986..36d416f3b 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -49,22 +49,20 @@ start_link() -> %% @doc Open a session. -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}). -open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> +open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> CleanStart = fun(_) -> ok = discard_session(ClientId, ConnPid), - emqx_session_sup:start_session(Attrs) + emqx_session_sup:start_session(SessAttrs) end, emqx_sm_locker:trans(ClientId, CleanStart); -open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> +open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> ResumeStart = fun(_) -> case resume_session(ClientId, ConnPid) of {ok, SPid} -> {ok, SPid, true}; {error, not_found} -> - emqx_session_sup:start_session(Attrs); - {error, Reason} -> - {error, Reason} + emqx_session_sup:start_session(SessAttrs) end end, emqx_sm_locker:trans(ClientId, ResumeStart). @@ -113,31 +111,31 @@ close_session(SPid) when is_pid(SPid) -> %% @doc Register a session with attributes. -spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - list(emqx_session:attribute())) -> ok). -register_session(ClientId, Attrs) when is_binary(ClientId) -> - register_session({ClientId, self()}, Attrs); + list(emqx_session:attr())) -> ok). +register_session(ClientId, SessAttrs) when is_binary(ClientId) -> + register_session({ClientId, self()}, SessAttrs); -register_session(Session = {ClientId, SPid}, Attrs) +register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> ets:insert(?SESSION_TAB, Session), - ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}), - case proplists:get_value(clean_start, Attrs, true) of - true -> ok; - false -> ets:insert(?SESSION_P_TAB, Session) - end, + ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}), + proplists:get_value(clean_start, SessAttrs, true) + andalso ets:insert(?SESSION_P_TAB, Session), emqx_sm_registry:register_session(Session), notify({registered, ClientId, SPid}). %% @doc Get session attrs --spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())). +-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())). get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). %% @doc Set session attrs -set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> - set_session_attrs({ClientId, self()}, Attrs); -set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) -> - ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}). +-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()}, + list(emqx_session:attr())) -> true). +set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) -> + set_session_attrs({ClientId, self()}, SessAttrs); +set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) -> + ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}). %% @doc Unregister a session -spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). @@ -154,18 +152,15 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid( %% @doc Get session stats -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). -get_session_stats(Session = {ClientId, SPid}) - when is_binary(ClientId), is_pid(SPid) -> +get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> safe_lookup_element(?SESSION_STATS_TAB, Session, []). %% @doc Set session stats -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, - emqx_stats:stats()) -> ok). + emqx_stats:stats()) -> true). set_session_stats(ClientId, Stats) when is_binary(ClientId) -> set_session_stats({ClientId, self()}, Stats); - -set_session_stats(Session = {ClientId, SPid}, Stats) - when is_binary(ClientId), is_pid(SPid) -> +set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) -> ets:insert(?SESSION_STATS_TAB, {Session, Stats}). %% @doc Lookup a session from registry diff --git a/src/emqx_sm_registry.erl b/src/emqx_sm_registry.erl index 74690b4b1..659b3a92b 100644 --- a/src/emqx_sm_registry.erl +++ b/src/emqx_sm_registry.erl @@ -20,7 +20,9 @@ -export([start_link/0]). -export([is_enabled/0]). + -export([register_session/1, lookup_session/1, unregister_session/1]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -30,12 +32,11 @@ -define(LOCK, {?MODULE, cleanup_sessions}). -record(global_session, {sid, pid}). --record(state, {}). -type(session_pid() :: pid()). -%% @doc Start the session manager. --spec(start_link() -> {ok, pid()} | ignore | {error, term()}). +%% @doc Start the global session manager. +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). @@ -46,19 +47,18 @@ is_enabled() -> -spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), session_pid()})). lookup_session(ClientId) -> - [{ClientId, SessionPid} || #global_session{pid = SessionPid} - <- mnesia:dirty_read(?TAB, ClientId)]. + [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)]. -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). -register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> - mnesia:dirty_write(?TAB, record(ClientId, SessionPid)). +register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> + mnesia:dirty_write(?TAB, record(ClientId, SessPid)). -spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok). -unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> - mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)). +unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) -> + mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)). -record(ClientId, SessionPid) -> - #global_session{sid = ClientId, pid = SessionPid}. +record(ClientId, SessPid) -> + #global_session{sid = ClientId, pid = SessPid}. %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -72,7 +72,7 @@ init([]) -> {attributes, record_info(fields, global_session)}]), ok = ekka_mnesia:copy_table(?TAB), ekka:monitor(membership), - {ok, #state{}}. + {ok, #{}}. handle_call(Req, _From, State) -> emqx_logger:error("[Registry] unexpected call: ~p", [Req]), @@ -107,9 +107,9 @@ code_change(_OldVsn, State, _Extra) -> %%------------------------------------------------------------------------------ cleanup_sessions(Node) -> - Pat = [{#global_session{pid = '$1', _ = '_'}, - [{'==', {node, '$1'}, Node}], ['$_']}], - lists:foreach(fun(Session) -> - mnesia:delete_object(?TAB, Session) - end, mnesia:select(?TAB, Pat)). + Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}], + lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)). + +delete_session(Session) -> + mnesia:delete_object(?TAB, Session, write). diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 664e04680..510b2d91a 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -31,9 +31,10 @@ code_change/3]). -record(update, {name, countdown, interval, func}). --record(state, {timer, updates :: #update{}}). +-record(state, {timer, updates :: [#update{}]}). -type(stats() :: list({atom(), non_neg_integer()})). + -export_type([stats/0]). %% Connection stats diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index 74014707a..fa08fa1bb 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -40,38 +40,61 @@ keepalive, enable_stats, stats_timer, - shutdown_reason + shutdown }). --define(INFO_KEYS, [peername, sockname]). - -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("MQTT/WS(~s): " ++ Format, + [esockd_net:format(State#state.peername) | Args])). %%------------------------------------------------------------------------------ %% API %%------------------------------------------------------------------------------ %% for debug -info(WSPid) -> - call(WSPid, info). +info(WSPid) when is_pid(WSPid) -> + call(WSPid, info); + +info(#state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + ProtoInfo = emqx_protocol:info(ProtoState), + ConnInfo = [{socktype, websocket}, + {conn_state, running}, + {peername, Peername}, + {sockname, Sockname}], + lists:append([ConnInfo, ProtoInfo]). %% for dashboard -attrs(CPid) when is_pid(CPid) -> - call(CPid, attrs). +attrs(WSPid) when is_pid(WSPid) -> + call(WSPid, attrs); -stats(WSPid) -> - call(WSPid, stats). +attrs(#state{peername = Peername, + sockname = Sockname, + proto_state = ProtoState}) -> + SockAttrs = [{peername, Peername}, + {sockname, Sockname}], + ProtoAttrs = emqx_protocol:attrs(ProtoState), + lists:usort(lists:append(SockAttrs, ProtoAttrs)). -kick(WSPid) -> +stats(WSPid) when is_pid(WSPid) -> + call(WSPid, stats); + +stats(#state{proto_state = ProtoState}) -> + lists:append([wsock_stats(), + emqx_misc:proc_stats(), + emqx_protocol:stats(ProtoState) + ]). + +kick(WSPid) when is_pid(WSPid) -> call(WSPid, kick). -session(WSPid) -> +session(WSPid) when is_pid(WSPid) -> call(WSPid, session). -call(WSPid, Req) -> +call(WSPid, Req) when is_pid(WSPid) -> Mref = erlang:monitor(process, WSPid), WSPid ! {call, {self(), Mref}, Req}, receive @@ -153,41 +176,30 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState, websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); {error, Error} -> ?WSLOG(error, "Protocol error - ~p", [Error], State), - {stop, State}; - {error, Error, ProtoState1} -> - shutdown(Error, State#state{proto_state = ProtoState1}); - {stop, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + stop(Error, State); + {error, Reason, ProtoState1} -> + shutdown(Reason, State#state{proto_state = ProtoState1}); + {stop, Error, ProtoState1} -> + stop(Error, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?WSLOG(error, "Frame error: ~p", [Error], State), - {stop, State}; + stop(Error, State); {'EXIT', Reason} -> ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State), - {stop, State} + shutdown(parse_error, State) end. -websocket_info({call, From, info}, State = #state{peername = Peername, - sockname = Sockname, - proto_state = ProtoState}) -> - ProtoInfo = emqx_protocol:info(ProtoState), - ConnInfo = [{socktype, websocket}, {conn_state, running}, - {peername, Peername}, {sockname, Sockname}], - gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])), +websocket_info({call, From, info}, State) -> + gen_server:reply(From, info(State)), {ok, State}; -websocket_info({call, From, attrs}, State = #state{peername = Peername, - sockname = Sockname, - proto_state = ProtoState}) -> - SockAttrs = [{peername, Peername}, - {sockname, Sockname}], - ProtoAttrs = emqx_protocol:attrs(ProtoState), - gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))), +websocket_info({call, From, attrs}, State) -> + gen_server:reply(From, attrs(State)), {ok, State}; -websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) -> - Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), - gen_server:reply(From, Stats), +websocket_info({call, From, stats}, State) -> + gen_server:reply(From, stats(State)), {ok, State}; websocket_info({call, From, kick}, State) -> @@ -203,15 +215,12 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} -> {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})}; {error, Reason} -> - shutdown(Reason, State); - {error, Reason, ProtoState1} -> - shutdown(Reason, State#state{proto_state = ProtoState1}) + shutdown(Reason, State) end; -websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> - Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), - emqx_protocol:stats(ProtoState)]), - emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats), +websocket_info({timeout, Timer, emit_stats}, + State = #state{stats_timer = Timer, proto_state = ProtoState}) -> + emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), {ok, State#state{stats_timer = undefined}, hibernate}; websocket_info({keepalive, start, Interval}, State) -> @@ -254,30 +263,40 @@ websocket_info(Info, State) -> ?WSLOG(error, "unexpected info: ~p", [Info], State), {ok, State}. -terminate(SockError, _Req, #state{keepalive = Keepalive, - proto_state = ProtoState, - shutdown_reason = Reason}) -> +terminate(SockError, _Req, State = #state{keepalive = Keepalive, + proto_state = ProtoState, + shutdown = Shutdown}) -> + ?WSLOG(debug, "Terminated for ~p, sockerror: ~p", + [Shutdown, SockError], State), emqx_keepalive:cancel(Keepalive), - io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]), - case Reason of - undefined -> - ok; - _ -> - emqx_protocol:shutdown(Reason, ProtoState) + case {ProtoState, Shutdown} of + {undefined, _} -> ok; + {_, {shutdown, Reason}} -> + emqx_protocol:shutdown(Reason, ProtoState); + {_, Error} -> + emqx_protocol:shutdown(Error, ProtoState) end. +%%------------------------------------------------------------------------------ +%% Internal functions +%%------------------------------------------------------------------------------ + reset_parser(State = #state{proto_state = ProtoState}) -> State#state{parser_state = emqx_protocol:parser(ProtoState)}. ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, - idle_timeout = Timeout}) -> - State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)}; + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; ensure_stats_timer(State) -> State. shutdown(Reason, State) -> - {stop, State#state{shutdown_reason = Reason}}. + {stop, State#state{shutdown = Reason}}. + +stop(Error, State) -> + {stop, State#state{shutdown = Error}}. wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. +