Improve the design of MQTT session management

This commit is contained in:
Feng Lee 2018-04-08 15:16:05 +08:00
parent 39548cc399
commit bfb23ff0b2
24 changed files with 446 additions and 308 deletions

View File

@ -96,7 +96,7 @@
-type(client() :: #client{}).
-record(session,
{ client_id :: client_id(),
{ sid :: client_id(),
pid :: pid()
}).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -119,7 +119,7 @@ route([{To, Node}], Delivery = #delivery{flows = Flows}) when is_atom(Node) ->
forward(Node, To, Delivery#delivery{flows = [{route, Node, To}|Flows]});
route([{To, Group}], Delivery) when is_binary(Group) ->
emqx_shared_pubsub:dispatch(Group, To, Delivery);
emqx_shared_sub:dispatch(Group, To, Delivery);
route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) -> route([Route], Acc) end, Delivery, Routes).
@ -248,7 +248,7 @@ handle_cast({From, {subscribe, Topic, Subscriber, Options}}, State) ->
[] ->
Group = proplists:get_value(share, Options),
true = do_subscribe(Group, Topic, Subscriber, Options),
emqx_shared_pubsub:subscribe(Group, Topic, subpid(Subscriber)),
emqx_shared_sub:subscribe(Group, Topic, subpid(Subscriber)),
emqx_router:add_route(From, Topic, dest(Options)),
{noreply, monitor_subscriber(Subscriber, State)};
[_] ->
@ -261,7 +261,7 @@ handle_cast({From, {unsubscribe, Topic, Subscriber}}, State) ->
[{_, Options}] ->
Group = proplists:get_value(share, Options),
true = do_unsubscribe(Group, Topic, Subscriber),
emqx_shared_pubsub:unsubscribe(Group, Topic, subpid(Subscriber)),
emqx_shared_sub:unsubscribe(Group, Topic, subpid(Subscriber)),
case ets:member(subscriber, Topic) of
false -> emqx_router:del_route(From, Topic, dest(Options));
true -> gen_server:reply(From, ok)

View File

@ -35,9 +35,9 @@ init([]) ->
%% Create the pubsub tables
create_tabs(),
%% Shared pubsub
Shared = {shared_pubsub, {emqx_shared_pubsub, start_link, []},
permanent, 5000, worker, [emqx_shared_pubsub]},
%% Shared subscription
Shared = {shared_sub, {emqx_shared_sub, start_link, []},
permanent, 5000, worker, [emqx_shared_sub]},
%% Broker helper
Helper = {broker_helper, {emqx_broker_helper, start_link, [stats_fun()]},

View File

@ -29,6 +29,11 @@
-type(env() :: {atom(), term()}).
-define(APP, emqx).
get_env(Key) ->
application:get_env(?APP, Key).
%% @doc Read the configuration of an application.
-spec(read(atom()) -> {ok, list(env())} | {error, term()}).
read(App) ->

View File

@ -1,41 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_locker).
-export([start_link/0]).
%% Lock/Unlock API based on canal-lock.
-export([lock/1, unlock/1]).
%% @doc Starts the lock server
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
canal_lock:start_link(?MODULE, 1).
%% @doc Lock a Key
-spec(lock(binary()) -> boolean()).
lock(Key) ->
case canal_lock:acquire(?MODULE, Key, 1, 1) of
{acquired, 1} -> true;
full -> false
end.
%% @doc Unlock a Key
-spec(unlock(binary()) -> ok).
unlock(Key) ->
canal_lock:release(?MODULE, Key, 1, 1).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,7 +16,9 @@
-module(emqx_pmon).
-export([new/0, monitor/2, demonitor/2, erase/2]).
-export([new/0, monitor/2, monitor/3, demonitor/2, find/2, erase/2]).
-compile({no_auto_import,[monitor/3]}).
-type(pmon() :: {?MODULE, map()}).
@ -26,25 +28,35 @@ new() ->
{?MODULE, [maps:new()]}.
-spec(monitor(pid(), pmon()) -> pmon()).
monitor(Pid, PM = {?MODULE, [M]}) ->
monitor(Pid, PM) ->
monitor(Pid, undefined, PM).
monitor(Pid, Val, PM = {?MODULE, [M]}) ->
case maps:is_key(Pid, M) of
true ->
PM;
true -> PM;
false ->
Ref = erlang:monitor(process, Pid),
{?MODULE, [maps:put(Pid, Ref, M)]}
{?MODULE, [maps:put(Pid, {Ref, Val}, M)]}
end.
-spec(demonitor(pid(), pmon()) -> pmon()).
demonitor(Pid, PM = {?MODULE, [M]}) ->
case maps:find(Pid, M) of
{ok, Ref} ->
{ok, {Ref, _Val}} ->
erlang:demonitor(Ref, [flush]),
{?MODULE, [maps:remove(Pid, M)]};
error ->
PM
end.
-spec(find(pid(), pmon()) -> undefined | term()).
find(Pid, {?MODULE, [M]}) ->
case maps:find(Pid, M) of
{ok, {_Ref, Val}} ->
Val;
error -> undefined
end.
-spec(erase(pid(), pmon()) -> pmon()).
erase(Pid, {?MODULE, [M]}) ->
{?MODULE, [maps:remove(Pid, M)]}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.

View File

@ -67,8 +67,7 @@
%% If the session is currently disconnected, the time at which the Session state
%% will be deleted.
-record(state,
{
%% Clean Start Flag
{ %% Clean Start Flag
clean_start = false :: boolean(),
%% Client Binding: local | remote
@ -136,8 +135,8 @@
%% Enable Stats
enable_stats :: boolean(),
%% Force GC Count
force_gc_count :: undefined | integer(),
%% Force GC reductions
reductions = 0 :: non_neg_integer(),
%% Ignore loop deliver?
ignore_loop_deliver = false :: boolean(),
@ -161,8 +160,8 @@
%% @doc Start a Session
-spec(start_link(map()) -> {ok, pid()} | {error, term()}).
start_link(ClientAttrs) ->
gen_server:start_link(?MODULE, ClientAttrs, [{hibernate_after, 10000}]).
start_link(Attrs) ->
gen_server:start_link(?MODULE, Attrs, [{hibernate_after, 10000}]).
%%--------------------------------------------------------------------
%% PubSub API
@ -170,71 +169,71 @@ start_link(ClientAttrs) ->
%% @doc Subscribe topics
-spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(Session, TopicTable) -> %%TODO: the ack function??...
gen_server:cast(Session, {subscribe, self(), TopicTable, fun(_) -> ok end}).
subscribe(SessionPid, TopicTable) -> %%TODO: the ack function??...
gen_server:cast(SessionPid, {subscribe, self(), TopicTable, fun(_) -> ok end}).
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
subscribe(SessionPid, PacketId, TopicTable) -> %%TODO: the ack function??...
From = self(),
AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end,
gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}).
gen_server:cast(SessionPid, {subscribe, From, TopicTable, AckFun}).
%% @doc Publish Message
-spec(publish(pid(), message()) -> ok | {error, term()}).
publish(_Session, Msg = #message{qos = ?QOS_0}) ->
publish(_SessionPid, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly
emqx_broker:publish(Msg), ok;
publish(_Session, Msg = #message{qos = ?QOS_1}) ->
publish(_SessionPid, Msg = #message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically
emqx_broker:publish(Msg), ok;
publish(Session, Msg = #message{qos = ?QOS_2}) ->
publish(SessionPid, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
gen_server:call(Session, {publish, Msg}, ?TIMEOUT).
gen_server:call(SessionPid, {publish, Msg}, ?TIMEOUT).
%% @doc PubAck Message
-spec(puback(pid(), mqtt_packet_id()) -> ok).
puback(Session, PacketId) ->
gen_server:cast(Session, {puback, PacketId}).
puback(SessionPid, PacketId) ->
gen_server:cast(SessionPid, {puback, PacketId}).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok).
pubrec(Session, PacketId) ->
gen_server:cast(Session, {pubrec, PacketId}).
pubrec(SessionPid, PacketId) ->
gen_server:cast(SessionPid, {pubrec, PacketId}).
-spec(pubrel(pid(), mqtt_packet_id()) -> ok).
pubrel(Session, PacketId) ->
gen_server:cast(Session, {pubrel, PacketId}).
pubrel(SessionPid, PacketId) ->
gen_server:cast(SessionPid, {pubrel, PacketId}).
-spec(pubcomp(pid(), mqtt_packet_id()) -> ok).
pubcomp(Session, PacketId) ->
gen_server:cast(Session, {pubcomp, PacketId}).
pubcomp(SessionPid, PacketId) ->
gen_server:cast(SessionPid, {pubcomp, PacketId}).
%% @doc Unsubscribe the topics
-spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok).
unsubscribe(Session, TopicTable) ->
gen_server:cast(Session, {unsubscribe, self(), TopicTable}).
unsubscribe(SessionPid, TopicTable) ->
gen_server:cast(SessionPid, {unsubscribe, self(), TopicTable}).
%% @doc Resume the session
-spec(resume(pid(), client_id(), pid()) -> ok).
resume(Session, ClientId, ClientPid) ->
gen_server:cast(Session, {resume, ClientId, ClientPid}).
resume(SessionPid, ClientId, ClientPid) ->
gen_server:cast(SessionPid, {resume, ClientId, ClientPid}).
%% @doc Get session state
state(Session) when is_pid(Session) ->
gen_server:call(Session, state).
state(SessionPid) when is_pid(SessionPid) ->
gen_server:call(SessionPid, state).
%% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())).
info(Session) when is_pid(Session) ->
gen_server:call(Session, info);
info(SessionPid) when is_pid(SessionPid) ->
gen_server:call(SessionPid, info);
info(State) when is_record(State, state) ->
?record_to_proplist(state, State, ?INFO_KEYS).
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(Session) when is_pid(Session) ->
gen_server:call(Session, stats);
stats(SessionPid) when is_pid(SessionPid) ->
gen_server:call(SessionPid, stats);
stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions,
@ -258,8 +257,8 @@ stats(#state{max_subscriptions = MaxSubscriptions,
%% @doc Discard the session
-spec(discard(pid(), client_id()) -> ok).
discard(Session, ClientId) ->
gen_server:cast(Session, {discard, ClientId}).
discard(SessionPid, ClientId) ->
gen_server:call(SessionPid, {discard, ClientId}).
%%--------------------------------------------------------------------
%% gen_server Callbacks
@ -276,7 +275,6 @@ init(#{clean_start := CleanStart,
{ok, QEnv} = emqx:env(mqueue),
MaxInflight = get_value(max_inflight, Env, 0),
EnableStats = get_value(enable_stats, Env, false),
ForceGcCount = emqx_gc:conn_max_gc_count(),
IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()),
State = #state{clean_start = CleanStart,
@ -296,10 +294,9 @@ init(#{clean_start := CleanStart,
max_awaiting_rel = get_value(max_awaiting_rel, Env),
expiry_interval = get_value(expiry_interval, Env),
enable_stats = EnableStats,
force_gc_count = ForceGcCount,
ignore_loop_deliver = IgnoreLoopDeliver,
created_at = os:timestamp()},
emqx_sm:register_session(ClientId, self()),
emqx_sm:register_session(#session{sid = ClientId, pid = self()}, info(State)),
emqx_hooks:run('session.created', [ClientId, Username]),
io:format("Session started: ~p~n", [self()]),
{ok, emit_stats(State), hibernate}.
@ -310,8 +307,13 @@ init_stats(Keys) ->
binding(ClientPid) ->
case node(ClientPid) =:= node() of true -> local; false -> remote end.
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call({discard, ClientPid}, _From, State = #state{client_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ClientPid], State),
{stop, {shutdown, discard}, ok, State};
handle_call({discard, ClientPid}, _From, State = #state{client_pid = OldClientPid}) ->
?LOG(warning, " ~p kickout ~p", [ClientPid, OldClientPid], State),
{stop, {shutdown, conflict}, ok, State};
handle_call({publish, Msg = #message{qos = ?QOS_2, headers = #{packet_id := PacketId}}}, _From,
State = #state{awaiting_rel = AwaitingRel,
@ -498,16 +500,6 @@ handle_cast({resume, ClientId, ClientPid},
%% Replay delivery and Dequeue pending messages
{noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
handle_cast({discard, ClientId},
State = #state{client_id = ClientId, client_pid = undefined}) ->
?LOG(warning, "Destroyed", [], State),
shutdown(discard, State);
handle_cast({discard, ClientId},
State = #state{client_id = ClientId, client_pid = OldClientPid}) ->
?LOG(warning, "kickout ~p", [OldClientPid], State),
shutdown(conflict, State);
handle_cast(Msg, State) ->
lager:error("[~s] Unexpected Cast: ~p", [?MODULE, Msg]),
{noreply, State}.
@ -563,10 +555,9 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(Reason, #state{client_id = ClientId, username = Username}) ->
%% Move to emqx_sm to avoid race condition
%% emqx_stats:del_session_stats(ClientId),
emqx_hooks:run('session.terminated', [ClientId, Username, Reason]),
emqx_sm:unregister_session(ClientId).
emqx_sm:unregister_session(#session{sid = ClientId, pid = self()}).
code_change(_OldVsn, Session, _Extra) ->
{ok, Session}.
@ -574,6 +565,7 @@ code_change(_OldVsn, Session, _Extra) ->
%%--------------------------------------------------------------------
%% Kickout old client
%%--------------------------------------------------------------------
kick(_ClientId, undefined, _Pid) ->
ignore;
kick(_ClientId, Pid, Pid) ->
@ -820,7 +812,8 @@ next_msg_id(State = #state{next_msg_id = Id}) ->
emit_stats(State = #state{enable_stats = false}) ->
State;
emit_stats(State = #state{client_id = ClientId}) ->
emqx_stats:set_session_stats(ClientId, stats(State)),
Session = #session{sid = ClientId, pid = self()},
emqx_sm_stats:set_session_stats(Session, stats(State)),
State.
inc_stats(Key) -> put(Key, get(Key) + 1).
@ -836,5 +829,6 @@ shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.
gc(State) ->
emqx_gc:maybe_force_gc(#state.force_gc_count, State).
State.
%%emqx_gc:maybe_force_gc(#state.force_gc_count, State).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,18 +14,12 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_shared_pubsub).
-module(emqx_shared_sub).
-behaviour(gen_server).
-include("emqx.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
%% API
-export([start_link/0]).
@ -41,32 +35,24 @@
-define(SERVER, ?MODULE).
-define(TABLE, shared_subscription).
-define(TAB, shared_subscription).
-record(state, {pmon}).
-record(shared_subscription, {group, topic, subpid}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TABLE, [
{type, bag},
{ram_copies, [node()]},
{record_name, shared_subscription},
{attributes, record_info(fields, shared_subscription)}]);
mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TABLE).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
start_link() ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{ram_copies, [node()]},
{record_name, shared_subscription},
{attributes, record_info(fields, shared_subscription)}]),
ok = ekka_mnesia:copy_table(?TAB),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(strategy() -> random | hash).
@ -113,14 +99,14 @@ subscribers(Group, Topic) ->
init([]) ->
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
mnesia:subscribe({table, ?TABLE, simple}),
mnesia:subscribe({table, ?TAB, simple}),
{ok, #state{pmon = PMon}}.
init_monitors() ->
mnesia:foldl(
fun(#shared_subscription{subpid = SubPid}, Mon) ->
Mon:monitor(SubPid)
end, emqx_pmon:new(), ?TABLE).
end, emqx_pmon:new(), ?TAB).
handle_call(Req, _From, State) ->
emqx_log:error("[Shared] Unexpected request: ~p", [Req]),
@ -156,7 +142,7 @@ handle_info(Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
mnesia:unsubscribe({table, ?TABLE, simple}).
mnesia:unsubscribe({table, ?TAB, simple}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -20,29 +20,34 @@
-include("emqx.hrl").
-export([start_link/1]).
-export([start_link/0]).
-export([open_session/1, lookup_session/1, close_session/1]).
-export([resume_session/1, discard_session/1]).
-export([register_session/1, register_session/2]).
-export([unregister_session/1, unregister_session/2]).
-export([resume_session/1, resume_session/2, discard_session/1, discard_session/2]).
-export([register_session/2, unregister_session/1]).
%% Internal functions for rpc
-export([lookup/1, dispatch/3]).
-export([dispatch/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {stats, pids = #{}}).
-record(state, {pmon}).
-spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}).
start_link(StatsFun) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [StatsFun], []).
-define(SM, ?MODULE).
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%--------------------------------------------------------------------
%% Open Session
%%--------------------------------------------------------------------
open_session(Attrs = #{clean_start := true,
client_id := ClientId, client_pid := ClientPid}) ->
CleanStart = fun(_) ->
discard_session(ClientId, ClientPid),
ok = discard_session(ClientId, ClientPid),
emqx_session_sup:start_session(Attrs)
end,
emqx_sm_locker:trans(ClientId, CleanStart);
@ -61,113 +66,133 @@ open_session(Attrs = #{clean_start := false,
end,
emqx_sm_locker:trans(ClientId, ResumeStart).
discard_session(ClientId) ->
%%--------------------------------------------------------------------
%% Discard Session
%%--------------------------------------------------------------------
discard_session(ClientId) when is_binary(ClientId) ->
discard_session(ClientId, self()).
discard_session(ClientId, ClientPid) ->
lists:foreach(fun({_, SessionPid}) ->
catch emqx_session:discard(SessionPid, ClientPid)
discard_session(ClientId, ClientPid) when is_binary(ClientId) ->
lists:foreach(
fun(#session{pid = SessionPid}) ->
case catch emqx_session:discard(SessionPid, ClientPid) of
{'EXIT', Error} ->
emqx_log:error("[SM] Failed to discard ~p: ~p", [SessionPid, Error]);
ok -> ok
end
end, lookup_session(ClientId)).
%%--------------------------------------------------------------------
%% Resume Session
%%--------------------------------------------------------------------
resume_session(ClientId) ->
resume_session(ClientId, self()).
resume_session(ClientId, ClientPid) ->
case lookup_session(ClientId) of
[] -> {error, not_found};
[{_, SessionPid}] ->
[#session{pid = SessionPid}] ->
ok = emqx_session:resume(SessionPid, ClientPid),
{ok, SessionPid};
[{_, SessionPid}|_More] = Sessions ->
Sessions ->
[#session{pid = SessionPid}|StaleSessions] = lists:reverse(Sessions),
emqx_log:error("[SM] More than one session found: ~p", [Sessions]),
lists:foreach(fun(#session{pid = Pid}) ->
catch emqx_session:discard(Pid, ClientPid)
end, StaleSessions),
ok = emqx_session:resume(SessionPid, ClientPid),
{ok, SessionPid}
end.
lookup_session(ClientId) ->
{ResL, _} = multicall(?MODULE, lookup, [ClientId]),
lists:append(ResL).
%%--------------------------------------------------------------------
%% Close a session
%%--------------------------------------------------------------------
close_session(ClientId) ->
lists:foreach(fun(#session{pid = SessionPid}) ->
emqx_session:close(SessionPid)
end, lookup_session(ClientId)).
close_session(#session{pid = SessionPid}) ->
emqx_session:close(SessionPid).
register_session(ClientId) ->
register_session(ClientId, self()).
%%--------------------------------------------------------------------
%% Create/Delete a session
%%--------------------------------------------------------------------
register_session(ClientId, SessionPid) ->
ets:insert(session, {ClientId, SessionPid}).
register_session(Session, Attrs) when is_record(Session, session) ->
ets:insert(session, Session),
ets:insert(session_attrs, {Session, Attrs}),
emqx_sm_registry:register_session(Session),
gen_server:cast(?MODULE, {registered, Session}).
unregister_session(ClientId) ->
unregister_session(ClientId, self()).
unregister_session(ClientId, SessionPid) ->
case ets:lookup(session, ClientId) of
[Session = {ClientId, SessionPid}] ->
unregister_session(Session) when is_record(Session, session) ->
emqx_sm_registry:unregister_session(Session),
emqx_sm_stats:del_session_stats(Session),
ets:delete(session_attrs, Session),
ets:delete(session_stats, Session),
ets:delete_object(session, Session);
_ ->
false
end.
ets:delete_object(session, Session),
gen_server:cast(?MODULE, {unregistered, Session}).
%%--------------------------------------------------------------------
%% Lookup a session from registry
%%--------------------------------------------------------------------
lookup_session(ClientId) ->
emqx_sm_registry:lookup_session(ClientId).
%%--------------------------------------------------------------------
%% Dispatch by client Id
%%--------------------------------------------------------------------
dispatch(ClientId, Topic, Msg) ->
case lookup(ClientId) of
[{_, Pid}] ->
case lookup_session_pid(ClientId) of
Pid when is_pid(Pid) ->
Pid ! {dispatch, Topic, Msg};
[] ->
undefined ->
emqx_hooks:run('message.dropped', [ClientId, Msg])
end.
lookup(ClientId) ->
ets:lookup(session, ClientId).
multicall(Mod, Fun, Args) ->
multicall(ekka:nodelist(up), Mod, Fun, Args).
multicall([Node], Mod, Fun, Args) when Node == node() ->
Res = erlang:apply(Mod, Fun, Args), [Res];
multicall(Nodes, Mod, Fun, Args) ->
{ResL, _} = emqx_rpc:multicall(Nodes, Mod, Fun, Args),
ResL.
lookup_session_pid(ClientId) ->
try ets:lookup_element(session, ClientId, #session.pid)
catch error:badarg ->
undefined
end.
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([StatsFun]) ->
{ok, sched_stats(StatsFun, #state{pids = #{}})}.
sched_stats(Fun, State) ->
{ok, TRef} = timer:send_interval(timer:seconds(1), stats),
State#state{stats = #{func => Fun, timer => TRef}}.
init([]) ->
_ = emqx_tables:create(session, [public, set, {keypos, 2},
{read_concurrency, true},
{write_concurrency, true}]),
_ = emqx_tables:create(session_attrs, [public, set,
{write_concurrency, true}]),
{ok, #state{pmon = emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
emqx_log:error("[SM] Unexpected request: ~p", [Req]),
{reply, ignore, State}.
handle_cast({registered, ClientId, SessionPid},
State = #state{pids = Pids}) ->
_ = erlang:monitor(process, SessionPid),
{noreply, State#state{pids = maps:put(SessionPid, ClientId, Pids)}};
handle_cast({registered, #session{sid = ClientId, pid = SessionPid}},
State = #state{pmon = PMon}) ->
{noreply, State#state{pmon = PMon:monitor(SessionPid, ClientId)}};
handle_cast({unregistered, #session{sid = _ClientId, pid = SessionPid}},
State = #state{pmon = PMon}) ->
{noreply, State#state{pmon = PMon:erase(SessionPid)}};
handle_cast(Msg, State) ->
emqx_log:error("[SM] Unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(stats, State) ->
{noreply, setstats(State), hibernate};
handle_info({'DOWN', _MRef, process, DownPid, _Reason},
State = #state{pids = Pids}) ->
case maps:find(DownPid, Pids) of
State = #state{pmon = PMon}) ->
case PMon:find(DownPid) of
{ok, ClientId} ->
unregister_session(ClientId, DownPid),
{noreply, State#state{pids = maps:remove(DownPid, Pids)}};
error ->
emqx_log:error("[SM] Session ~p not found", [DownPid]),
case ets:lookup(session, ClientId) of
[] -> ok;
_ -> unregister_session(#session{sid = ClientId, pid = DownPid})
end,
{noreply, State};
undefined ->
{noreply, State}
end;
@ -175,16 +200,9 @@ handle_info(Info, State) ->
emqx_log:error("[SM] Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State = #state{stats = #{timer := TRef}}) ->
timer:cancel(TRef).
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
setstats(State = #state{stats = #{func := Fun}}) ->
Fun(ets:info(session, size)), State.

109
src/emqx_sm_registry.erl Normal file
View File

@ -0,0 +1,109 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sm_registry).
-behaviour(gen_server).
-include("emqx.hrl").
%% API
-export([start_link/0]).
-export([register_session/1, lookup_session/1, unregister_session/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-define(TAB, session_registry).
-define(LOCK, {?MODULE, cleanup_sessions}).
-record(state, {}).
%%--------------------------------------------------------------------
%% API
%%--------------------------------------------------------------------
start_link() ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{ram_copies, [node()]},
{record_name, session},
{attributes, record_info(fields, session)}]),
ok = ekka_mnesia:copy_table(?TAB),
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-spec(lookup_session(client_id()) -> list(session())).
lookup_session(ClientId) ->
mnesia:dirty_read(?TAB, ClientId).
-spec(register_session(session()) -> ok).
register_session(Session) when is_record(Session, session) ->
mnesia:dirty_write(?TAB, Session).
-spec(unregister_session(session()) -> ok).
unregister_session(Session) when is_record(Session, session) ->
mnesia:dirty_delete_object(?TAB, Session).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
ekka:monitor(membership),
{ok, #state{}}.
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
global:trans({?LOCK, self()},
fun() ->
mnesia:transaction(fun cleanup_sessions/1, [Node])
end),
{noreply, State};
handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
cleanup_sessions(Node) ->
Pat = [{#session{pid = '$1', _ = '_'},
[{'==', {node, '$1'}, Node}], ['$_']}],
lists:foreach(fun(Session) ->
mnesia:delete_object(?TAB, Session)
end, mnesia:select(?TAB, Pat)).

72
src/emqx_sm_stats.erl Normal file
View File

@ -0,0 +1,72 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_sm_stats).
-behaviour(gen_statem).
-include("emqx.hrl").
%% API
-export([start_link/0]).
-export([set_session_stats/2, get_session_stats/1, del_session_stats/1]).
%% gen_statem callbacks
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-define(TAB, session_stats).
-record(state, {statsfun}).
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
-spec(set_session_stats(session(), emqx_stats:stats()) -> true).
set_session_stats(Session, Stats) when is_record(Session, session) ->
ets:insert(?TAB, {Session, [{'$ts', emqx_time:now_secs()}|Stats]}).
-spec(get_session_stats(session()) -> emqx_stats:stats()).
get_session_stats(Session) ->
case ets:lookup(?TAB, Session) of
[{_, Stats}] -> Stats;
[] -> []
end.
-spec(del_session_stats(session()) -> true).
del_session_stats(Session) ->
ets:delete(?TAB, Session).
init([]) ->
_ = emqx_tables:create(?TAB, [public, {write_concurrency, true}]),
StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'),
{ok, idle, #state{statsfun = StatsFun}, timer:seconds(1)}.
callback_mode() -> handle_event_function.
handle_event(timeout, _Timeout, idle, State = #state{statsfun = StatsFun}) ->
case ets:info(session, size) of
undefined -> ok;
Size -> StatsFun(Size)
end,
{next_state, idle, State, timer:seconds(1)}.
terminate(_Reason, _StateName, _State) ->
ok.
code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -26,15 +26,12 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]),
Childs = [child(M) || M <- [emqx_sm_locker,
emqx_sm_registry,
emqx_sm_stats,
emqx_sm]],
{ok, {{one_for_all, 10, 3600}, Childs}}.
StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'),
SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]},
permanent, 5000, worker, [emqx_sm]},
{ok, {{one_for_all, 10, 3600}, [SM]}}.
create_tab(Tab) ->
emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]).
child(M) ->
{M, {M, start_link, []}, permanent, 5000, worker, [M]}.

View File

@ -26,8 +26,7 @@
-export([all/0]).
%% Client and Session Stats
-export([set_client_stats/2, get_client_stats/1, del_client_stats/1,
set_session_stats/2, get_session_stats/1, del_session_stats/1]).
-export([set_client_stats/2, get_client_stats/1, del_client_stats/1]).
%% Statistics API.
-export([statsfun/1, statsfun/2, getstats/0, getstat/1, setstat/2, setstat/3]).
@ -40,6 +39,8 @@
-type(stats() :: list({atom(), non_neg_integer()})).
-export_type([stats/0]).
-define(STATS_TAB, mqtt_stats).
-define(CLIENT_STATS_TAB, mqtt_client_stats).
-define(SESSION_STATS_TAB, mqtt_session_stats).
@ -101,20 +102,6 @@ get_client_stats(ClientId) ->
del_client_stats(ClientId) ->
ets:delete(?CLIENT_STATS_TAB, ClientId).
-spec(set_session_stats(binary(), stats()) -> true).
set_session_stats(ClientId, Stats) ->
ets:insert(?SESSION_STATS_TAB, {ClientId, [{'$ts', emqx_time:now_secs()}|Stats]}).
-spec(get_session_stats(binary()) -> stats()).
get_session_stats(ClientId) ->
case ets:lookup(?SESSION_STATS_TAB, ClientId) of
[{_, Stats}] -> Stats;
[] -> []
end.
-spec(del_session_stats(binary()) -> true).
del_session_stats(ClientId) ->
ets:delete(?SESSION_STATS_TAB, ClientId).
all() -> ets:tab2list(?STATS_TAB).

View File

@ -57,7 +57,6 @@ init([]) ->
{ok, {{one_for_all, 10, 3600},
[?CHILD(emqx_ctl, worker),
?CHILD(emqx_hooks, worker),
?CHILD(emqx_locker, worker),
?CHILD(emqx_stats, worker),
?CHILD(emqx_metrics, worker),
?CHILD(emqx_sys, worker),