Fix errors found by dialyzer

This commit is contained in:
Feng Lee 2018-08-31 16:46:51 +08:00
parent 3045ec10ab
commit ea1ae70833
16 changed files with 200 additions and 196 deletions

View File

@ -66,41 +66,36 @@ is_running(Node) ->
%% PubSub API
%%--------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
-spec(subscribe(emqx_topic:topic() | string()) -> ok).
subscribe(Topic) ->
emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-> ok | {error, term()}).
subscribe(Topic, Sub) when is_list(Sub)->
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub));
subscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
subscribe(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(),
emqx_topic:subopts()) -> ok | {error, term()}).
subscribe(Topic, Sub, Options) when is_list(Sub)->
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options);
subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)->
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(),
emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options);
subscribe(Topic, SubPid, Options) when is_pid(SubPid)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options).
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
publish(Msg) ->
emqx_broker:publish(Msg).
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-> ok | {error, term()}).
unsubscribe(Topic, Sub) when is_list(Sub) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub));
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
{SubPid, SubId} = Subscriber,
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId);
unsubscribe(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid).
%%--------------------------------------------------------------------
%% PubSub management API
@ -109,12 +104,12 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
-spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
-> emqx_types:subopts()).
get_subopts(Topic, Subscriber) ->
emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)).
emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber).
-spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(),
emqx_types:subopts()) -> ok).
set_subopts(Topic, Subscriber, Options) when is_list(Options) ->
emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options).
emqx_types:subopts()) -> boolean()).
set_subopts(Topic, Subscriber, Options) when is_map(Options) ->
emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options).
-spec(topics() -> list(emqx_topic:topic())).
topics() -> emqx_router:topics().
@ -127,16 +122,11 @@ subscribers(Topic) ->
subscriptions(Subscriber) ->
emqx_broker:subscriptions(Subscriber).
-spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()).
subscribed(Topic, Subscriber) ->
emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)).
list_to_subid(SubId) when is_binary(SubId) ->
SubId;
list_to_subid(SubId) when is_list(SubId) ->
iolist_to_binary(SubId);
list_to_subid(SubPid) when is_pid(SubPid) ->
SubPid.
-spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()).
subscribed(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:subscribed(iolist_to_binary(Topic), SubPid);
subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:subscribed(iolist_to_binary(Topic), SubId).
%%--------------------------------------------------------------------
%% Hooks API

View File

@ -25,6 +25,8 @@
-define(ACL_RULE_TAB, emqx_acl_rule).
-type(state() :: #{acl_file := string()}).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
@ -95,7 +97,7 @@ match(Credentials, Topic, [Rule|Rules]) ->
{matched, AllowDeny}
end.
-spec(reload_acl(#{}) -> ok | {error, term()}).
-spec(reload_acl(state()) -> ok | {error, term()}).
reload_acl(#{acl_file := AclFile}) ->
case catch load_rules_from_file(AclFile) of
true ->

View File

@ -28,10 +28,11 @@
-define(ALARM_MGR, ?MODULE).
-record(state, {alarms}).
start_link() ->
start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end).
start_with(
fun(Pid) ->
gen_event:add_handler(Pid, ?MODULE, [])
end).
start_with(Fun) ->
case gen_event:start_link({local, ?ALARM_MGR}) of
@ -73,42 +74,42 @@ delete_alarm_handler(Module) when is_atom(Module) ->
%% Default Alarm handler
%%------------------------------------------------------------------------------
init(_) -> {ok, #state{alarms = []}}.
init(_) -> {ok, #{alarms => []}}.
handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State);
handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) ->
handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) ->
case encode_alarm(Alarm) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
{error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
end,
{ok, State#state{alarms = [Alarm|Alarms]}};
{ok, State#{alarms := [Alarm|Alarms]}};
handle_event({clear_alarm, AlarmId}, State = #state{alarms = Alarms}) ->
case emqx_json:safe_encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of
handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
{error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason])
end,
{ok, State#state{alarms = lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
{ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
handle_event(Event, State)->
error_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
{ok, State}.
handle_info(Info, State) ->
error_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
{ok, State}.
handle_call(get_alarms, State = #state{alarms = Alarms}) ->
handle_call(get_alarms, State = #{alarms := Alarms}) ->
{ok, Alarms, State};
handle_call(Req, State) ->
error_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
{ok, ignored, State}.
terminate(swap, State) ->
@ -132,8 +133,8 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title,
alarm_msg(Type, AlarmId, Json) ->
Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json),
emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
emqx_message:set_flags(#{sys => true}, Msg)).
emqx_message:set_headers( #{'Content-Type' => <<"application/json">>},
emqx_message:set_flag(sys, Msg)).
topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);

View File

@ -133,7 +133,8 @@ handle_info(start, State = #state{options = Options,
Subs = get_value(subscriptions, Options, []),
[emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","),
[emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})],
[emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules,
emqx_topic:validate({filter, i2b(Topic)})],
{noreply, State#state{client_pid = ClientPid}};
{error,_} ->
erlang:send_after(ReconnectTime, self(), start),

View File

@ -260,9 +260,9 @@ subscription(Topic, Subscriber) ->
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1;
length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) == 1;
length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).

View File

@ -186,7 +186,7 @@ with_owner(Options) ->
connect(Client) ->
gen_statem:call(Client, connect, infinity).
-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]})
-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}])
-> subscribe_ret()).
subscribe(Client, Topic) when is_binary(Topic) ->
subscribe(Client, {Topic, ?QOS_0});

View File

@ -202,20 +202,19 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} ->
{noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
{error, Reason} ->
shutdown(Reason, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1})
shutdown(Reason, State)
end;
handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
handle_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
handle_info(timeout, State) ->
shutdown(idle_timeout, State);
handle_info({shutdown, Error}, State) ->
shutdown(Error, State);
handle_info({shutdown, Reason}, State) ->
shutdown(Reason, State);
handle_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
@ -311,13 +310,13 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
{ok, ProtoState1} ->
NewState = State#state{proto_state = ProtoState1},
handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
{error, Error} ->
?LOG(error, "Protocol error - ~p", [Error], State),
shutdown(Error, State);
{error, Error, ProtoState1} ->
shutdown(Error, State#state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} ->
stop(Reason, State#state{proto_state = ProtoState1})
{error, Reason} ->
?LOG(error, "Process packet error - ~p", [Reason], State),
shutdown(Reason, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Error, ProtoState1} ->
stop(Error, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
?LOG(error, "Framing error - ~p", [Error], State),
@ -373,7 +372,7 @@ run_socket(State = #state{transport = Transport, socket = Socket}) ->
ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined,
idle_timeout = IdleTimeout}) ->
State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)};
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
ensure_stats_timer(State) -> State.
shutdown(Reason, State) ->

View File

@ -60,8 +60,8 @@ init([Pool, Id, Node, Topic, Options]) ->
case net_kernel:connect_node(Node) of
true ->
true = erlang:monitor_node(Node, true),
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]),
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqx_mqueue:init(#{type => simple,
max_len => State#state.max_queue_len,
@ -86,11 +86,6 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) ->
parse_opts([_Opt | Opts], State) ->
parse_opts(Opts, State).
qname(Node, Topic) when is_atom(Node) ->
qname(atom_to_list(Node), Topic);
qname(Node, Topic) ->
iolist_to_binary(["Bridge:", Node, ":", Topic]).
handle_call(Req, _From, State) ->
emqx_logger:error("[Bridge] unexpected call: ~p", [Req]),
{reply, ignored, State}.
@ -104,7 +99,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down})
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
{noreply, State};
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
@ -157,7 +152,6 @@ dequeue(State = #state{mqueue = MQ}) ->
dequeue(State#state{mqueue = MQ1})
end.
transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix,
topic_suffix = Suffix}) ->
transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) ->
Msg#message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

View File

@ -26,8 +26,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {}).
%% Bytes sent and received of Broker
-define(BYTES_METRICS, [
{counter, 'bytes/received'}, % Total bytes received
@ -85,8 +83,8 @@
-define(TAB, ?MODULE).
-define(SERVER, ?MODULE).
%% @doc Start the metrics server
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
%% @doc Start the metrics server.
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@ -252,7 +250,7 @@ init([]) ->
% Create metrics table
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
{ok, #state{}, hibernate}.
{ok, #{}, hibernate}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Metrics] unexpected call: ~p", [Req]),
@ -266,7 +264,7 @@ handle_info(Info, State) ->
emqx_logger:error("[Metrics] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{}) ->
terminate(_Reason, #{}) ->
ok.
code_change(_OldVsn, State, _Extra) ->

View File

@ -23,8 +23,8 @@
load(Rules0) ->
Rules = compile(Rules0),
emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]),
emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]),
emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]).
rewrite_subscribe(_Credentials, TopicTable, Rules) ->

View File

@ -188,14 +188,14 @@ session(#pstate{session = SPid}) ->
SPid.
parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
emqx_frame:initial_state(#{packet_size => Size, version => Ver}).
emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
%%------------------------------------------------------------------------------
%% Packet Received
%%------------------------------------------------------------------------------
-spec(received(emqx_mqtt_types:packet(), state())
-> {ok, state()} | {error, term()} | {error, term(), state()}).
-spec(received(emqx_mqtt_types:packet(), state()) ->
{ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}).
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
{error, proto_not_connected, PState};
@ -451,6 +451,7 @@ puback(?QOS_2, PacketId, {ok, _}, PState) ->
%% Deliver Packet -> Client
%%------------------------------------------------------------------------------
-spec(deliver(term(), state()) -> {ok, state()} | {error, term()}).
deliver({connack, ReasonCode}, PState) ->
send(?CONNACK_PACKET(ReasonCode), PState);

View File

@ -148,6 +148,9 @@
}).
-type(spid() :: pid()).
-type(attr() :: {atom(), term()}).
-export_type([attr/0]).
-define(TIMEOUT, 60000).
@ -564,7 +567,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
true = emqx_sm:set_session_stats(ClientId, stats(State)),
_ = emqx_sm:set_session_stats(ClientId, stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate};
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->

View File

@ -49,22 +49,20 @@ start_link() ->
%% @doc Open a session.
-spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
CleanStart = fun(_) ->
ok = discard_session(ClientId, ConnPid),
emqx_session_sup:start_session(Attrs)
emqx_session_sup:start_session(SessAttrs)
end,
emqx_sm_locker:trans(ClientId, CleanStart);
open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, ConnPid) of
{ok, SPid} ->
{ok, SPid, true};
{error, not_found} ->
emqx_session_sup:start_session(Attrs);
{error, Reason} ->
{error, Reason}
emqx_session_sup:start_session(SessAttrs)
end
end,
emqx_sm_locker:trans(ClientId, ResumeStart).
@ -113,31 +111,31 @@ close_session(SPid) when is_pid(SPid) ->
%% @doc Register a session with attributes.
-spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()},
list(emqx_session:attribute())) -> ok).
register_session(ClientId, Attrs) when is_binary(ClientId) ->
register_session({ClientId, self()}, Attrs);
list(emqx_session:attr())) -> ok).
register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
register_session({ClientId, self()}, SessAttrs);
register_session(Session = {ClientId, SPid}, Attrs)
register_session(Session = {ClientId, SPid}, SessAttrs)
when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_TAB, Session),
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}),
case proplists:get_value(clean_start, Attrs, true) of
true -> ok;
false -> ets:insert(?SESSION_P_TAB, Session)
end,
ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
proplists:get_value(clean_start, SessAttrs, true)
andalso ets:insert(?SESSION_P_TAB, Session),
emqx_sm_registry:register_session(Session),
notify({registered, ClientId, SPid}).
%% @doc Get session attrs
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())).
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
%% @doc Set session attrs
set_session_attrs(ClientId, Attrs) when is_binary(ClientId) ->
set_session_attrs({ClientId, self()}, Attrs);
set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}).
-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
list(emqx_session:attr())) -> true).
set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) ->
set_session_attrs({ClientId, self()}, SessAttrs);
set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}).
%% @doc Unregister a session
-spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
@ -154,18 +152,15 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(
%% @doc Get session stats
-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
get_session_stats(Session = {ClientId, SPid})
when is_binary(ClientId), is_pid(SPid) ->
get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_STATS_TAB, Session, []).
%% @doc Set session stats
-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
emqx_stats:stats()) -> ok).
emqx_stats:stats()) -> true).
set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
set_session_stats({ClientId, self()}, Stats);
set_session_stats(Session = {ClientId, SPid}, Stats)
when is_binary(ClientId), is_pid(SPid) ->
set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
%% @doc Lookup a session from registry

View File

@ -20,7 +20,9 @@
-export([start_link/0]).
-export([is_enabled/0]).
-export([register_session/1, lookup_session/1, unregister_session/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@ -30,12 +32,11 @@
-define(LOCK, {?MODULE, cleanup_sessions}).
-record(global_session, {sid, pid}).
-record(state, {}).
-type(session_pid() :: pid()).
%% @doc Start the session manager.
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
%% @doc Start the global session manager.
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
@ -46,19 +47,18 @@ is_enabled() ->
-spec(lookup_session(emqx_types:client_id())
-> list({emqx_types:client_id(), session_pid()})).
lookup_session(ClientId) ->
[{ClientId, SessionPid} || #global_session{pid = SessionPid}
<- mnesia:dirty_read(?TAB, ClientId)].
[{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
-spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
mnesia:dirty_write(?TAB, record(ClientId, SessionPid)).
register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
mnesia:dirty_write(?TAB, record(ClientId, SessPid)).
-spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok).
unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)).
unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)).
record(ClientId, SessionPid) ->
#global_session{sid = ClientId, pid = SessionPid}.
record(ClientId, SessPid) ->
#global_session{sid = ClientId, pid = SessPid}.
%%------------------------------------------------------------------------------
%% gen_server callbacks
@ -72,7 +72,7 @@ init([]) ->
{attributes, record_info(fields, global_session)}]),
ok = ekka_mnesia:copy_table(?TAB),
ekka:monitor(membership),
{ok, #state{}}.
{ok, #{}}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Registry] unexpected call: ~p", [Req]),
@ -107,9 +107,9 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
cleanup_sessions(Node) ->
Pat = [{#global_session{pid = '$1', _ = '_'},
[{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun(Session) ->
mnesia:delete_object(?TAB, Session)
end, mnesia:select(?TAB, Pat)).
Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)).
delete_session(Session) ->
mnesia:delete_object(?TAB, Session, write).

View File

@ -31,9 +31,10 @@
code_change/3]).
-record(update, {name, countdown, interval, func}).
-record(state, {timer, updates :: #update{}}).
-record(state, {timer, updates :: [#update{}]}).
-type(stats() :: list({atom(), non_neg_integer()})).
-export_type([stats/0]).
%% Connection stats

View File

@ -40,38 +40,61 @@
keepalive,
enable_stats,
stats_timer,
shutdown_reason
shutdown
}).
-define(INFO_KEYS, [peername, sockname]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(WSLOG(Level, Format, Args, State),
emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
emqx_logger:Level("MQTT/WS(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%% for debug
info(WSPid) ->
call(WSPid, info).
info(WSPid) when is_pid(WSPid) ->
call(WSPid, info);
info(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket},
{conn_state, running},
{peername, Peername},
{sockname, Sockname}],
lists:append([ConnInfo, ProtoInfo]).
%% for dashboard
attrs(CPid) when is_pid(CPid) ->
call(CPid, attrs).
attrs(WSPid) when is_pid(WSPid) ->
call(WSPid, attrs);
stats(WSPid) ->
call(WSPid, stats).
attrs(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
kick(WSPid) ->
stats(WSPid) when is_pid(WSPid) ->
call(WSPid, stats);
stats(#state{proto_state = ProtoState}) ->
lists:append([wsock_stats(),
emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState)
]).
kick(WSPid) when is_pid(WSPid) ->
call(WSPid, kick).
session(WSPid) ->
session(WSPid) when is_pid(WSPid) ->
call(WSPid, session).
call(WSPid, Req) ->
call(WSPid, Req) when is_pid(WSPid) ->
Mref = erlang:monitor(process, WSPid),
WSPid ! {call, {self(), Mref}, Req},
receive
@ -153,41 +176,30 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
{error, Error} ->
?WSLOG(error, "Protocol error - ~p", [Error], State),
{stop, State};
{error, Error, ProtoState1} ->
shutdown(Error, State#state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1})
stop(Error, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Error, ProtoState1} ->
stop(Error, State#state{proto_state = ProtoState1})
end;
{error, Error} ->
?WSLOG(error, "Frame error: ~p", [Error], State),
{stop, State};
stop(Error, State);
{'EXIT', Reason} ->
?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State),
{stop, State}
shutdown(parse_error, State)
end.
websocket_info({call, From, info}, State = #state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket}, {conn_state, running},
{peername, Peername}, {sockname, Sockname}],
gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])),
websocket_info({call, From, info}, State) ->
gen_server:reply(From, info(State)),
{ok, State};
websocket_info({call, From, attrs}, State = #state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))),
websocket_info({call, From, attrs}, State) ->
gen_server:reply(From, attrs(State)),
{ok, State};
websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) ->
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]),
gen_server:reply(From, Stats),
websocket_info({call, From, stats}, State) ->
gen_server:reply(From, stats(State)),
{ok, State};
websocket_info({call, From, kick}, State) ->
@ -203,15 +215,12 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} ->
{ok, ensure_stats_timer(State#state{proto_state = ProtoState1})};
{error, Reason} ->
shutdown(Reason, State);
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1})
shutdown(Reason, State)
end;
websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState)]),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
websocket_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{ok, State#state{stats_timer = undefined}, hibernate};
websocket_info({keepalive, start, Interval}, State) ->
@ -254,30 +263,40 @@ websocket_info(Info, State) ->
?WSLOG(error, "unexpected info: ~p", [Info], State),
{ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive,
terminate(SockError, _Req, State = #state{keepalive = Keepalive,
proto_state = ProtoState,
shutdown_reason = Reason}) ->
shutdown = Shutdown}) ->
?WSLOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError], State),
emqx_keepalive:cancel(Keepalive),
io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]),
case Reason of
undefined ->
ok;
_ ->
emqx_protocol:shutdown(Reason, ProtoState)
case {ProtoState, Shutdown} of
{undefined, _} -> ok;
{_, {shutdown, Reason}} ->
emqx_protocol:shutdown(Reason, ProtoState);
{_, Error} ->
emqx_protocol:shutdown(Error, ProtoState)
end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
reset_parser(State = #state{proto_state = ProtoState}) ->
State#state{parser_state = emqx_protocol:parser(ProtoState)}.
ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined,
idle_timeout = Timeout}) ->
State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)};
idle_timeout = IdleTimeout}) ->
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
ensure_stats_timer(State) ->
State.
shutdown(Reason, State) ->
{stop, State#state{shutdown_reason = Reason}}.
{stop, State#state{shutdown = Reason}}.
stop(Error, State) ->
{stop, State#state{shutdown = Error}}.
wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS].