Misc fix for the MQTT Version 5.0

This commit is contained in:
Feng Lee 2018-03-30 17:18:08 +08:00
parent c79fa29587
commit 2eed46310c
22 changed files with 429 additions and 451 deletions

View File

@ -1,5 +1,5 @@
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% Copyright (c) 2013-2018 EMQ Inc. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License. %% you may not use this file except in compliance with the License.
@ -18,97 +18,118 @@
%% Banner %% Banner
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(COPYRIGHT, "Copyright (c) 2013-2018 EMQ Enterprise, Inc."). -define(COPYRIGHT, "Copyright (c) 2013-2018 EMQ Inc.").
-define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0"). -define(LICENSE_MESSAGE, "Licensed under the Apache License, Version 2.0").
-define(PROTOCOL_VERSION, "MQTT/5.0"). -define(PROTOCOL_VERSION, "MQTT/5.0").
%%-define(ERTS_MINIMUM, "9.0"). -define(ERTS_MINIMUM_REQUIRED, "9.2").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Sys/Queue/Share Topics' Prefix %% Topics' prefix: $SYS | $queue | $share
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-define(SYSTOP, <<"$SYS/">>). %% System Topic %% System Topic
-define(SYSTOP, <<"$SYS/">>).
-define(QUEUE, <<"$queue/">>). %% Queue Topic %% Queue Topic
-define(QUEUE, <<"$queue/">>).
-define(SHARE, <<"$share/">>). %% Shared Topic %% Shared Topic
-define(SHARE, <<"$share/">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Client and Session %% Topic, subscription and subscriber
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type(qos() :: integer()).
-type(topic() :: binary()). -type(topic() :: binary()).
-type(subscriber() :: pid() | binary() | {binary(), pid()}). -type(suboption() :: {qos, qos()}
| {share, '$queue'}
| {share, binary()}
| {atom(), term()}).
-type(suboption() :: {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). -record(subscription,
{ subid :: binary() | atom(),
topic :: topic(),
subopts :: list(suboption())
}).
-type(subscription() :: #subscription{}).
-type(subscriber() :: binary() | pid() | {binary(), pid()}).
%%--------------------------------------------------------------------
%% Client and session
%%--------------------------------------------------------------------
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | none | atom()).
-type(peername() :: {inet:ip_address(), inet:port_number()}).
-type(client_id() :: binary() | atom()). -type(client_id() :: binary() | atom()).
-type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). -type(username() :: binary() | atom()).
-type(client() :: #{zone := atom(), -type(mountpoint() :: binary()).
node := atom(),
clientid := client_id(),
protocol := protocol(),
connector => atom(),
peername => {inet:ip_address(), inet:port_number()},
username => binary(),
atom() => term()}).
-type(session() :: #{client_id := client_id(), -type(connector() :: atom()).
clean_start := boolean(),
expiry_interval := non_neg_integer()}).
-record(session, {client_id, sess_pid}). -type(zone() :: atom()).
-record(client,
{ id :: client_id(),
pid :: pid(),
zone :: zone(),
node :: node(),
username :: username(),
peername :: peername(),
protocol :: protocol(),
connector :: connector(),
mountpoint :: mountpoint(),
attributes :: #{atom() => term()}
}).
-type(client() :: #client{}).
-record(session,
{ client_id :: client_id(),
pid :: pid()
}).
-type(session() :: #session{}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Message and Delivery %% Message and delivery
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-type(message_id() :: binary() | undefined). -type(message_id() :: binary() | undefined).
%% -type(protocol() :: mqtt | 'mqtt-sn' | coap | stomp | atom()). -type(message_flag() :: sys | dup | retain | atom()).
-type(message_from() :: #{zone := atom(), -type(message_flags() :: #{message_flag() => boolean()}).
node := atom(),
clientid := binary(),
connector => atom(),
peername => {inet:ip_address(), inet:port_number()},
username => binary(),
atom() => term()}).
-type(message_flags() :: #{dup => boolean(), %% Dup flag
qos => 0 | 1 | 2, %% QoS
sys => boolean(), %% $SYS flag
retain => boolean(), %% Retain flag
durable => boolean(), %% Durable flag
atom() => boolean()}).
-type(message_headers() :: #{packet_id => pos_integer(), -type(message_headers() :: #{packet_id => pos_integer(),
priority => pos_integer(), priority => non_neg_integer(),
expiry => integer(), %% Time to live ttl => pos_integer(),
atom() => term()}). atom() => term()}).
-type(payload() :: binary()).
%% See 'Application Message' in MQTT Version 5.0 %% See 'Application Message' in MQTT Version 5.0
-record(message, -record(message,
{ id :: message_id(), %% Global unique id { id :: message_id(), %% Global unique id
from :: message_from(), %% Message from qos :: qos(), %% Message QoS
from :: atom() | client(), %% Message from
sender :: pid(), %% The pid of the sender/publisher sender :: pid(), %% The pid of the sender/publisher
packet_id, flags :: message_flags(), %% Message flags
dup :: boolean(), %% Dup flag
qos :: 0 | 1 | 2, %% QoS
sys :: boolean(), %% $SYS flag
retain :: boolean(), %% Retain flag
flags = [], %% :: message_flags(), %% Message flags
headers :: message_headers(), %% Message headers headers :: message_headers(), %% Message headers
protocol :: protocol(),
topic :: binary(), %% Message topic topic :: binary(), %% Message topic
properties :: map(), %% Message user properties properties :: map(), %% Message user properties
payload :: binary(), %% Message payload payload :: payload(), %% Message payload
timestamp :: erlang:timestamp() %% Timestamp timestamp :: erlang:timestamp() %% Timestamp
}). }).
@ -127,62 +148,16 @@
-type(pubsub() :: publish | subscribe). -type(pubsub() :: publish | subscribe).
-define(PS(PS), (PS =:= publish orelse PS =:= subscribe)). -define(PS(I), (I =:= publish orelse I =:= subscribe)).
%%--------------------------------------------------------------------
%% Subscription
%%--------------------------------------------------------------------
-record(subscription,
{ subid :: binary() | atom(),
topic :: binary(),
subopts :: list()
}).
-type(subscription() :: #subscription{}).
%%--------------------------------------------------------------------
%% MQTT Client
%%--------------------------------------------------------------------
-type(ws_header_key() :: atom() | binary() | string()).
-type(ws_header_val() :: atom() | binary() | string() | integer()).
-record(mqtt_client,
{ client_id :: binary() | undefined,
client_pid :: pid(),
username :: binary() | undefined,
peername :: {inet:ip_address(), inet:port_number()},
clean_sess :: boolean(),
proto_ver :: 3 | 4,
keepalive = 0,
will_topic :: undefined | binary(),
ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
mountpoint :: undefined | binary(),
connected_at :: erlang:timestamp(),
%%TODO: Headers
headers = [] :: list()
}).
-type(mqtt_client() :: #mqtt_client{}).
%%--------------------------------------------------------------------
%% MQTT Session
%%--------------------------------------------------------------------
-record(mqtt_session,
{ client_id :: binary(),
sess_pid :: pid(),
clean_sess :: boolean()
}).
-type(mqtt_session() :: #mqtt_session{}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Route %% Route
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-record(route, { topic :: binary(), dest :: {binary(), node()} | node() }). -record(route,
{ topic :: topic(),
dest :: {binary(), node()} | node()
}).
-type(route() :: #route{}). -type(route() :: #route{}).
@ -227,7 +202,15 @@
%% Plugin %% Plugin
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-record(plugin, { name, version, descr, active = false }). -record(plugin,
{ name :: atom(),
version :: string(),
dir :: string(),
descr :: string(),
vendor :: string(),
active :: boolean(),
info :: map()
}).
-type(plugin() :: #plugin{}). -type(plugin() :: #plugin{}).
@ -235,7 +218,14 @@
%% Command %% Command
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-record(command, { name, action, args = [], opts = [], usage, descr }). -record(command,
{ name,
action,
args = [],
opts = [],
usage,
descr
}).
-type(command() :: #command{}). -type(command() :: #command{}).

View File

@ -77,6 +77,26 @@
-define(MAX_CLIENTID_LEN, 1024). -define(MAX_CLIENTID_LEN, 1024).
%%--------------------------------------------------------------------
%% MQTT Client
%%--------------------------------------------------------------------
-record(mqtt_client,
{ client_id :: binary() | undefined,
client_pid :: pid(),
username :: binary() | undefined,
peername :: {inet:ip_address(), inet:port_number()},
clean_sess :: boolean(),
proto_ver :: 3 | 4,
keepalive = 0,
will_topic :: undefined | binary(),
mountpoint :: undefined | binary(),
connected_at :: erlang:timestamp(),
attributes :: map()
}).
-type(mqtt_client() :: #mqtt_client{}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% MQTT Control Packet Types %% MQTT Control Packet Types
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -278,12 +298,12 @@
-define(CONNACK_PACKET(ReasonCode), -define(CONNACK_PACKET(ReasonCode),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{reason_code = ReturnCode}}). variable = #mqtt_packet_connack{reason_code = ReasonCode}}).
-define(CONNACK_PACKET(ReasonCode, SessPresent), -define(CONNACK_PACKET(ReasonCode, SessPresent),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},
variable = #mqtt_packet_connack{ack_flags = SessPresent, variable = #mqtt_packet_connack{ack_flags = SessPresent,
reason_code = ReturnCode}}). reason_code = ReasonCode}}).
-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), -define(CONNACK_PACKET(ReasonCode, SessPresent, Properties),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK},

View File

@ -30,7 +30,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-define(ACCESS_CONTROL_TAB, mqtt_access_control). -define(TAB, access_control).
-type(password() :: undefined | binary()). -type(password() :: undefined | binary()).
@ -45,9 +45,10 @@
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Authenticate MQTT Client. %% @doc Authenticate Client.
-spec(auth(Client :: mqtt_client(), Password :: password()) -> ok | {ok, boolean()} | {error, term()}). -spec(auth(Client :: client(), Password :: password())
auth(Client, Password) when is_record(Client, mqtt_client) -> -> ok | {ok, boolean()} | {error, term()}).
auth(Client, Password) when is_record(Client, client) ->
auth(Client, Password, lookup_mods(auth)). auth(Client, Password, lookup_mods(auth)).
auth(_Client, _Password, []) -> auth(_Client, _Password, []) ->
case emqx:env(allow_anonymous, false) of case emqx:env(allow_anonymous, false) of
@ -65,7 +66,7 @@ auth(Client, Password, [{Mod, State, _Seq} | Mods]) ->
%% @doc Check ACL %% @doc Check ACL
-spec(check_acl(Client, PubSub, Topic) -> allow | deny when -spec(check_acl(Client, PubSub, Topic) -> allow | deny when
Client :: mqtt_client(), Client :: client(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary()). Topic :: binary()).
check_acl(Client, PubSub, Topic) when ?PS(PubSub) -> check_acl(Client, PubSub, Topic) when ?PS(PubSub) ->
@ -102,7 +103,7 @@ unregister_mod(Type, Mod) when Type =:= auth; Type =:= acl ->
%% @doc Lookup authentication or ACL modules. %% @doc Lookup authentication or ACL modules.
-spec(lookup_mods(auth | acl) -> list()). -spec(lookup_mods(auth | acl) -> list()).
lookup_mods(Type) -> lookup_mods(Type) ->
case ets:lookup(?ACCESS_CONTROL_TAB, tab_key(Type)) of case ets:lookup(?TAB, tab_key(Type)) of
[] -> []; [] -> [];
[{_, Mods}] -> Mods [{_, Mods}] -> Mods
end. end.
@ -111,14 +112,15 @@ tab_key(auth) -> auth_modules;
tab_key(acl) -> acl_modules. tab_key(acl) -> acl_modules.
%% @doc Stop access control server. %% @doc Stop access control server.
stop() -> gen_server:call(?MODULE, stop). stop() ->
gen_server:call(?MODULE, stop).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
ets:new(?ACCESS_CONTROL_TAB, [set, named_table, protected, {read_concurrency, true}]), _ = ets:new(?TAB, [set, named_table, protected, {read_concurrency, true}]),
{ok, #state{}}. {ok, #state{}}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
@ -130,7 +132,7 @@ handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) -> NewMods = lists:sort(fun({_, _, Seq1}, {_, _, Seq2}) ->
Seq1 >= Seq2 Seq1 >= Seq2
end, [{Mod, ModState, Seq} | Mods]), end, [{Mod, ModState, Seq} | Mods]),
ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), NewMods}), ets:insert(?TAB, {tab_key(Type), NewMods}),
ok; ok;
{error, Error} -> {error, Error} ->
{error, Error}; {error, Error};
@ -145,7 +147,7 @@ handle_call({unregister_mod, Type, Mod}, _From, State) ->
false -> false ->
{reply, {error, not_found}, State}; {reply, {error, not_found}, State};
_ -> _ ->
ets:insert(?ACCESS_CONTROL_TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}), _ = ets:insert(?TAB, {tab_key(Type), lists:keydelete(Mod, 1, Mods)}),
{reply, ok, State} {reply, ok, State}
end; end;
@ -172,7 +174,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
if_existed(false, Fun) -> Fun(); if_existed(false, Fun) ->
Fun();
if_existed(_Mod, _Fun) -> {error, already_existed}. if_existed(_Mod, _Fun) ->
{error, already_existed}.

View File

@ -82,7 +82,7 @@ bin(B) when is_binary(B) ->
B. B.
%% @doc Match access rule %% @doc Match access rule
-spec(match(mqtt_client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch). -spec(match(client(), topic(), rule()) -> {matched, allow} | {matched, deny} | nomatch).
match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) -> match(_Client, _Topic, {AllowDeny, all}) when (AllowDeny =:= allow) orelse (AllowDeny =:= deny) ->
{matched, AllowDeny}; {matched, AllowDeny};
match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters}) match(Client, Topic, {AllowDeny, Who, _PubSub, TopicFilters})
@ -99,13 +99,13 @@ match_who(_Client, {user, all}) ->
true; true;
match_who(_Client, {client, all}) -> match_who(_Client, {client, all}) ->
true; true;
match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) -> match_who(#client{id = ClientId}, {client, ClientId}) ->
true; true;
match_who(#mqtt_client{username = Username}, {user, Username}) -> match_who(#client{username = Username}, {user, Username}) ->
true; true;
match_who(#mqtt_client{peername = undefined}, {ipaddr, _Tup}) -> match_who(#client{peername = undefined}, {ipaddr, _Tup}) ->
false; false;
match_who(#mqtt_client{peername = {IP, _}}, {ipaddr, CIDR}) -> match_who(#client{peername = {IP, _}}, {ipaddr, CIDR}) ->
esockd_cidr:match(IP, CIDR); esockd_cidr:match(IP, CIDR);
match_who(Client, {'and', Conds}) when is_list(Conds) -> match_who(Client, {'and', Conds}) when is_list(Conds) ->
lists:foldl(fun(Who, Allow) -> lists:foldl(fun(Who, Allow) ->
@ -137,13 +137,13 @@ feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []). feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) -> feed_var(_Client, [], Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
feed_var(Client = #mqtt_client{client_id = undefined}, [<<"%c">>|Words], Acc) -> feed_var(Client = #client{id = undefined}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [<<"%c">>|Acc]); feed_var(Client, Words, [<<"%c">>|Acc]);
feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"%c">>|Words], Acc) -> feed_var(Client = #client{id = ClientId}, [<<"%c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]); feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #mqtt_client{username = undefined}, [<<"%u">>|Words], Acc) -> feed_var(Client = #client{username = undefined}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [<<"%u">>|Acc]); feed_var(Client, Words, [<<"%u">>|Acc]);
feed_var(Client = #mqtt_client{username = Username}, [<<"%u">>|Words], Acc) -> feed_var(Client = #client{username = Username}, [<<"%u">>|Words], Acc) ->
feed_var(Client, Words, [Username|Acc]); feed_var(Client, Words, [Username|Acc]);
feed_var(Client, [W|Words], Acc) -> feed_var(Client, [W|Words], Acc) ->
feed_var(Client, Words, [W|Acc]). feed_var(Client, Words, [W|Acc]).

View File

@ -77,7 +77,7 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
%% @doc Check ACL %% @doc Check ACL
-spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when -spec(check_acl({Client, PubSub, Topic}, State) -> allow | deny | ignore when
Client :: mqtt_client(), Client :: client(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary(), Topic :: binary(),
State :: #state{}). State :: #state{}).

View File

@ -26,9 +26,10 @@
-callback(init(AclOpts :: list()) -> {ok, State :: any()}). -callback(init(AclOpts :: list()) -> {ok, State :: any()}).
-callback(check_acl({Client :: mqtt_client(), -callback(check_acl({Client :: client(),
PubSub :: pubsub(), PubSub :: pubsub(),
Topic :: binary()}, State :: any()) -> allow | deny | ignore). Topic :: topic()}, State :: any())
-> allow | deny | ignore).
-callback(reload_acl(State :: any()) -> ok | {error, term()}). -callback(reload_acl(State :: any()) -> ok | {error, term()}).

View File

@ -30,7 +30,7 @@
-callback(init(AuthOpts :: list()) -> {ok, State :: any()}). -callback(init(AuthOpts :: list()) -> {ok, State :: any()}).
-callback(check(Client :: mqtt_client(), -callback(check(Client :: client(),
Password :: binary(), Password :: binary(),
State :: any()) State :: any())
-> ok | {ok, boolean()} | ignore | {error, string()}). -> ok | {ok, boolean()} | ignore | {error, string()}).

View File

@ -20,110 +20,93 @@
-include("emqx.hrl"). -include("emqx.hrl").
%% API Exports -export([start_link/1]).
-export([start_link/3]).
-export([lookup/1, lookup_proc/1, reg/1, unreg/1]). -export([lookup/1, reg/1, unreg/1]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
-record(state, {pool, id, statsfun, monitors}). -record(state, {stats_fun, stats_timer, monitors}).
-define(POOL, ?MODULE). -define(SERVER, ?MODULE).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Start Client Manager %% @doc Start the client manager
-spec(start_link(atom(), pos_integer(), fun()) -> {ok, pid()} | ignore | {error, term()}). -spec(start_link(fun()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id, StatsFun) -> start_link(StatsFun) ->
gen_server:start_link(?MODULE, [Pool, Id, StatsFun], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [StatsFun], []).
%% @doc Lookup Client by ClientId %% @doc Lookup ClientPid by ClientId
-spec(lookup(binary()) -> mqtt_client() | undefined). -spec(lookup(client_id()) -> pid() | undefined).
lookup(ClientId) when is_binary(ClientId) -> lookup(ClientId) when is_binary(ClientId) ->
case ets:lookup(mqtt_client, ClientId) of [Client] -> Client; [] -> undefined end. try ets:lookup_element(client, ClientId, 2)
%% @doc Lookup client pid by clientId
-spec(lookup_proc(binary()) -> pid() | undefined).
lookup_proc(ClientId) when is_binary(ClientId) ->
try ets:lookup_element(mqtt_client, ClientId, #mqtt_client.client_pid)
catch catch
error:badarg -> undefined error:badarg -> undefined
end. end.
%% @doc Register ClientId with Pid. %% @doc Register a clientId
-spec(reg(mqtt_client()) -> ok). -spec(reg(client_id()) -> ok).
reg(Client = #mqtt_client{client_id = ClientId}) -> reg(ClientId) ->
gen_server:call(pick(ClientId), {reg, Client}, 120000). gen_server:cast(?SERVER, {reg, ClientId, self()}).
%% @doc Unregister clientId with pid. %% @doc Unregister clientId with pid.
-spec(unreg(binary()) -> ok). -spec(unreg(client_id()) -> ok).
unreg(ClientId) when is_binary(ClientId) -> unreg(ClientId) ->
gen_server:cast(pick(ClientId), {unreg, ClientId, self()}). gen_server:cast(?SERVER, {unreg, ClientId, self()}).
pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([Pool, Id, StatsFun]) -> init([StatsFun]) ->
gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, Ref} = timer:send_interval(timer:seconds(1), stats),
{ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. {ok, #state{stats_fun = StatsFun, stats_timer = Ref, monitors = dict:new()}}.
handle_call({reg, Client = #mqtt_client{client_id = ClientId,
client_pid = Pid}}, _From, State) ->
case lookup_proc(ClientId) of
Pid ->
{reply, ok, State};
_ ->
ets:insert(mqtt_client, Client),
{reply, ok, setstats(monitor_client(ClientId, Pid, State))}
end;
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
lager:error("[MQTT-CM] Unexpected Call: ~p", [Req]), emqx_log:error("[CM] Unexpected request: ~p", [Req]),
{reply, ignore, State}. {reply, ignore, State}.
handle_cast({reg, ClientId, Pid}, State) ->
_ = ets:insert(client, {ClientId, Pid}),
{noreply, monitor_client(ClientId, Pid, State)};
handle_cast({unreg, ClientId, Pid}, State) -> handle_cast({unreg, ClientId, Pid}, State) ->
case lookup_proc(ClientId) of case lookup(ClientId) of
Pid -> Pid -> remove_client({ClientId, Pid});
ets:delete(mqtt_client, ClientId), _ -> ok
{noreply, setstats(State)}; end,
_ -> {noreply, State};
{noreply, State}
end;
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
lager:error("[MQTT-CM] Unexpected Cast: ~p", [Msg]), emqx_log:error("[CM] Unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
case dict:find(MRef, State#state.monitors) of case dict:find(MRef, State#state.monitors) of
{ok, {ClientId, DownPid}} -> {ok, ClientId} ->
case lookup_proc(ClientId) of case lookup(ClientId) of
DownPid -> DownPid -> remove_client({ClientId, DownPid});
emqx_stats:del_client_stats(ClientId), _ -> ok
ets:delete(mqtt_client, ClientId);
_ ->
ignore
end, end,
{noreply, setstats(erase_monitor(MRef, State))}; {noreply, erase_monitor(MRef, State)};
error -> error ->
lager:error("MRef of client ~p not found", [DownPid]), emqx_log:error("[CM] down client ~p not found", [DownPid]),
{noreply, State} {noreply, State}
end; end;
handle_info(stats, State) ->
{noreply, setstats(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
lager:error("[CM] Unexpected Info: ~p", [Info]), lager:error("[CM] Unexpected Info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) -> terminate(_Reason, _State = #state{stats_timer = TRef}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}). timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -132,14 +115,19 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
remove_client(Client) ->
ets:delete_object(client, Client),
ets:delete(client_stats, Client),
ets:delete(client_attrs, Client).
monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) -> monitor_client(ClientId, Pid, State = #state{monitors = Monitors}) ->
MRef = erlang:monitor(process, Pid), MRef = erlang:monitor(process, Pid),
State#state{monitors = dict:store(MRef, {ClientId, Pid}, Monitors)}. State#state{monitors = dict:store(MRef, ClientId, Monitors)}.
erase_monitor(MRef, State = #state{monitors = Monitors}) -> erase_monitor(MRef, State = #state{monitors = Monitors}) ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef),
State#state{monitors = dict:erase(MRef, Monitors)}. State#state{monitors = dict:erase(MRef, Monitors)}.
setstats(State = #state{statsfun = StatsFun}) -> setstats(State = #state{stats_fun = StatsFun}) ->
StatsFun(ets:info(mqtt_client, size)), State. StatsFun(ets:info(client, size)), State.

View File

@ -18,33 +18,24 @@
-behaviour(supervisor). -behaviour(supervisor).
%% API
-export([start_link/0]). -export([start_link/0]).
%% Supervisor callbacks
-export([init/1]). -export([init/1]).
-define(TAB, mqtt_client).
start_link() -> start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
%% Create client table %% Create table
create_client_tab(), lists:foreach(fun create_tab/1, [client, client_stats, client_attrs]),
%% CM Pool Sup StatsFun = emqx_stats:statsfun('clients/count', 'clients/max'),
MFA = {emqx_cm, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]},
PoolSup = emqx_pool_sup:spec([emqx_cm, hash, erlang:system_info(schedulers), MFA]), CM = {emqx_cm, {emqx_cm, start_link, [StatsFun]},
permanent, 5000, worker, [emqx_cm]},
{ok, {{one_for_all, 10, 3600}, [PoolSup]}}. {ok, {{one_for_all, 10, 3600}, [CM]}}.
create_client_tab() -> create_tab(Tab) ->
case ets:info(?TAB, name) of emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]).
undefined ->
ets:new(?TAB, [ordered_set, named_table, public,
{keypos, 2}, {write_concurrency, true}]);
_ ->
ok
end.

View File

@ -202,8 +202,8 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
end, State); end, State);
%% Fastlane %% Fastlane
handle_info({dispatch, _Topic, Message}, State) -> handle_info({dispatch, _Topic, Msg}, State) ->
handle_info({deliver, Message#message{qos = ?QOS_0}}, State); handle_info({deliver, emqx_message:set_flag(qos, ?QOS_0, Msg)}, State);
handle_info({deliver, Message}, State) -> handle_info({deliver, Message}, State) ->
with_proto( with_proto(

View File

@ -18,139 +18,62 @@
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -export([make/3]).
-export([make/3, make/4, from_packet/1, from_packet/2, from_packet/3, -export([get_flag/2, get_flag/3, set_flag/2, unset_flag/2]).
to_packet/1]).
-export([set_flag/1, set_flag/2, unset_flag/1, unset_flag/2]). -export([get_header/2, get_header/3, set_header/3]).
-export([format/1]). -export([get_user_property/2, get_user_property/3, set_user_property/3]).
-type(msg_from() :: atom() | {binary(), undefined | binary()}). %% Create a default message
-spec(make(atom() | client(), topic(), payload()) -> message()).
%% @doc Make a message make(From, Topic, Payload) when is_atom(From); is_record(From, client) ->
-spec(make(msg_from(), binary(), binary()) -> message()). #message{id = msgid(),
make(From, Topic, Payload) -> qos = 0,
make(From, ?QOS_0, Topic, Payload). from = From,
sender = self(),
-spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> message()). flags = #{},
make(From, Qos, Topic, Payload) -> headers = #{},
#message{id = msgid(), topic = Topic,
from = From, properties = #{},
qos = ?QOS_I(Qos), payload = Payload,
topic = Topic, timestamp = os:timestamp()}.
payload = Payload,
timestamp = os:timestamp()}.
%% @doc Message from Packet
-spec(from_packet(mqtt_packet()) -> message()).
from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = Qos,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}) ->
#message{id = msgid(),
packet_id = PacketId,
qos = Qos,
retain = Retain,
dup = Dup,
topic = Topic,
payload = Payload,
timestamp = os:timestamp()};
from_packet(#mqtt_packet_connect{will_flag = false}) ->
undefined;
from_packet(#mqtt_packet_connect{client_id = ClientId,
username = Username,
will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg}) ->
#message{id = msgid(),
topic = Topic,
from = {ClientId, Username},
retain = Retain,
qos = Qos,
dup = false,
payload = Msg,
timestamp = os:timestamp()}.
from_packet(ClientId, Packet) ->
Msg = from_packet(Packet),
Msg#message{from = ClientId}.
from_packet(Username, ClientId, Packet) ->
Msg = from_packet(Packet),
Msg#message{from = {ClientId, Username}}.
msgid() -> emqx_guid:gen(). msgid() -> emqx_guid:gen().
%% @doc Message to Packet %% @doc Get flag
-spec(to_packet(message()) -> mqtt_packet()). get_flag(Flag, Msg) ->
to_packet(#message{packet_id = PkgId, get_flag(Flag, Msg, false).
qos = Qos, get_flag(Flag, #message{flags = Flags}, Default) ->
retain = Retain, maps:get(Flag, Flags, Default).
dup = Dup,
topic = Topic,
payload = Payload}) ->
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, %% @doc Set flag
qos = Qos, -spec(set_flag(message_flag(), message()) -> message()).
retain = Retain, set_flag(Flag, Msg = #message{flags = Flags}) ->
dup = Dup}, Msg#message{flags = maps:put(Flag, true, Flags)}.
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = if
Qos =:= ?QOS_0 -> undefined;
true -> PkgId
end
},
payload = Payload}.
%% @doc set dup, retain flag %% @doc Unset flag
-spec(set_flag(message()) -> message()). -spec(unset_flag(message_flag(), message()) -> message()).
set_flag(Msg) -> unset_flag(Flag, Msg = #message{flags = Flags}) ->
Msg#message{dup = true, retain = true}. Msg#message{flags = maps:remove(Flag, Flags)}.
-spec(set_flag(atom(), message()) -> message()). %% @doc Get header
set_flag(dup, Msg = #message{dup = false}) -> get_header(Hdr, Msg) ->
Msg#message{dup = true}; get_header(Hdr, Msg, undefined).
set_flag(sys, Msg = #message{sys = false}) -> get_header(Hdr, #message{headers = Headers}, Default) ->
Msg#message{sys = true}; maps:get(Hdr, Headers, Default).
set_flag(retain, Msg = #message{retain = false}) ->
Msg#message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup;
Flag =:= retain;
Flag =:= sys -> Msg.
%% @doc Unset dup, retain flag %% @doc Set header
-spec(unset_flag(message()) -> message()). set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
unset_flag(Msg) -> Msg#message{headers = maps:put(Hdr, Val, Headers)}.
Msg#message{dup = false, retain = false}.
-spec(unset_flag(dup | retain | atom(), message()) -> message()). %% @doc Get user property
unset_flag(dup, Msg = #message{dup = true}) -> get_user_property(Key, Msg) ->
Msg#message{dup = false}; get_user_property(Key, Msg, undefined).
unset_flag(retain, Msg = #message{retain = true}) -> get_user_property(Key, #message{properties = Props}, Default) ->
Msg#message{retain = false}; maps:get(Key, Props, Default).
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message set_user_property(Key, Val, Msg = #message{properties = Props}) ->
format(#message{id = MsgId, packet_id = PktId, from = {ClientId, Username}, Msg#message{properties = maps:put(Key, Val, Props)}.
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]);
%% TODO:...
format(#message{id = MsgId, packet_id = PktId, from = From,
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.

View File

@ -28,44 +28,55 @@ load(Env) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]), emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]). emqx:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId, on_client_connected(ConnAck, Client = #client{id = ClientId,
username = Username, username = Username,
peername = {IpAddr, _}, peername = {IpAddr, _}
clean_sess = CleanSess, %%clean_sess = CleanSess,
proto_ver = ProtoVer}, Env) -> %%proto_ver = ProtoVer
Payload = mochijson2:encode([{clientid, ClientId}, }, Env) ->
case catch emqx_json:encode([{clientid, ClientId},
{username, Username}, {username, Username},
{ipaddress, iolist_to_binary(emqx_net:ntoa(IpAddr))}, {ipaddress, iolist_to_binary(emqx_net:ntoa(IpAddr))},
{clean_sess, CleanSess}, %%{clean_sess, CleanSess}, %%TODO:: fixme later
{protocol, ProtoVer}, %%{protocol, ProtoVer},
{connack, ConnAck}, {connack, ConnAck},
{ts, emqx_time:now_secs()}]), {ts, emqx_time:now_secs()}]) of
Msg = message(qos(Env), topic(connected, ClientId), Payload), Payload when is_binary(Payload) ->
emqx:publish(emqx_message:set_flag(sys, Msg)), Msg = message(qos(Env), topic(connected, ClientId), Payload),
emqx:publish(emqx_message:set_flag(sys, Msg));
{'EXIT', Reason} ->
emqx_log:error("[Presence Module] json error: ~p", [Reason])
end,
{ok, Client}. {ok, Client}.
on_client_disconnected(Reason, #mqtt_client{client_id = ClientId, on_client_disconnected(Reason, #client{id = ClientId,
username = Username}, Env) -> username = Username}, Env) ->
Payload = mochijson2:encode([{clientid, ClientId}, case catch emqx_json:encode([{clientid, ClientId},
{username, Username}, {username, Username},
{reason, reason(Reason)}, {reason, reason(Reason)},
{ts, emqx_time:now_secs()}]), {ts, emqx_time:now_secs()}]) of
Msg = message(qos(Env), topic(disconnected, ClientId), Payload), Payload when is_binary(Payload) ->
emqx:publish(emqx_message:set_flag(sys, Msg)), ok. Msg = message(qos(Env), topic(disconnected, ClientId), Payload),
emqx:publish(emqx_message:set_flag(sys, Msg));
{'EXIT', Reason} ->
emqx_log:error("[Presence Module] json error: ~p", [Reason])
end, ok.
unload(_Env) -> unload(_Env) ->
emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3), emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). emqx:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3).
message(Qos, Topic, Payload) -> message(Qos, Topic, Payload) ->
emqx_message:make(presence, Qos, Topic, iolist_to_binary(Payload)). Msg = emqx_message:make(presence, Topic, iolist_to_binary(Payload)),
emqx_message:set_header(qos, Qos, Msg).
topic(connected, ClientId) -> topic(connected, ClientId) ->
emqx_topic:systop(list_to_binary(["clients/", ClientId, "/connected"])); emqx_topic:systop(list_to_binary(["clients/", ClientId, "/connected"]));
topic(disconnected, ClientId) -> topic(disconnected, ClientId) ->
emqx_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])). emqx_topic:systop(list_to_binary(["clients/", ClientId, "/disconnected"])).
qos(Env) -> proplists:get_value(qos, Env, 0). qos(Env) ->
proplists:get_value(qos, Env, 0).
reason(Reason) when is_atom(Reason) -> Reason; reason(Reason) when is_atom(Reason) -> Reason;
reason({Error, _}) when is_atom(Error) -> Error; reason({Error, _}) when is_atom(Error) -> Error;

View File

@ -33,9 +33,9 @@
load(Topics) -> load(Topics) ->
emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]). emqx:hook('client.connected', fun ?MODULE:on_client_connected/3, [Topics]).
on_client_connected(?CONNACK_ACCEPT, Client = #mqtt_client{client_id = ClientId, on_client_connected(?CONNACK_ACCEPT, Client = #client{id = ClientId,
client_pid = ClientPid, pid = ClientPid,
username = Username}, Topics) -> username = Username}, Topics) ->
Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end, Replace = fun(Topic) -> rep(<<"%u">>, Username, rep(<<"%c">>, ClientId, Topic)) end,
TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics], TopicTable = [{Replace(Topic), Qos} || {Topic, Qos} <- Topics],
@ -49,7 +49,7 @@ unload(_) ->
emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3). emqx:unhook('client.connected', fun ?MODULE:on_client_connected/3).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Internal Functions %% Internal functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
rep(<<"%c">>, ClientId, Topic) -> rep(<<"%c">>, ClientId, Topic) ->

View File

@ -20,11 +20,12 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
%% API
-export([protocol_name/1, type_name/1, connack_name/1]). -export([protocol_name/1, type_name/1, connack_name/1]).
-export([format/1]). -export([format/1]).
-export([to_message/1, from_message/1]).
%% @doc Protocol name of version %% @doc Protocol name of version
-spec(protocol_name(mqtt_vsn()) -> binary()). -spec(protocol_name(mqtt_vsn()) -> binary()).
protocol_name(?MQTT_PROTO_V3) -> <<"MQIsdp">>; protocol_name(?MQTT_PROTO_V3) -> <<"MQIsdp">>;
@ -45,6 +46,49 @@ connack_name(?CONNACK_SERVER) -> 'CONNACK_SERVER';
connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS'; connack_name(?CONNACK_CREDENTIALS) -> 'CONNACK_CREDENTIALS';
connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'. connack_name(?CONNACK_AUTH) -> 'CONNACK_AUTH'.
%% @doc From Message to Packet
-spec(from_message(message()) -> mqtt_packet()).
from_message(Msg = #message{qos = Qos,
topic = Topic,
payload = Payload}) ->
Dup = emqx_message:get_flag(dup, Msg, false),
Retain = emqx_message:get_flag(retain, Msg, false),
PacketId = emqx_message:get_header(packet_id, Msg),
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = Qos,
retain = Retain,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}.
%% @doc Message from Packet
-spec(to_message(mqtt_packet()) -> message()).
to_message(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = Qos,
dup = Dup},
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId,
properties = Props},
payload = Payload}) ->
Msg = emqx_message:make(undefined, Topic, Payload),
Msg#message{qos = Qos,
flags = #{dup => Dup, retain => Retain},
headers = #{packet_id => PacketId},
properties = Props};
to_message(#mqtt_packet_connect{will_flag = false}) ->
undefined;
to_message(#mqtt_packet_connect{will_retain = Retain,
will_qos = Qos,
will_topic = Topic,
will_props = Props,
will_msg = Payload}) ->
Msg = emqx_message:make(undefined, Topic, Payload),
Msg#message{flags = #{retain => Retain},
headers = #{qos => Qos},
properties = Props}.
%% @doc Format packet %% @doc Format packet
-spec(format(mqtt_packet()) -> iolist()). -spec(format(mqtt_packet()) -> iolist()).
format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) -> format(#mqtt_packet{header = Header, variable = Variable, payload = Payload}) ->

View File

@ -122,17 +122,11 @@ client(#proto_state{client_id = ClientId,
WillMsg =:= undefined -> undefined; WillMsg =:= undefined -> undefined;
true -> WillMsg#message.topic true -> WillMsg#message.topic
end, end,
#mqtt_client{client_id = ClientId, #client{id = ClientId,
client_pid = ClientPid, pid = ClientPid,
username = Username, username = Username,
peername = Peername, peername = Peername,
clean_sess = CleanSess, mountpoint = MountPoint}.
proto_ver = ProtoVer,
keepalive = Keepalive,
will_topic = WillTopic,
ws_initial_headers = WsInitialHeaders,
mountpoint = MountPoint,
connected_at = Time}.
session(#proto_state{session = Session}) -> session(#proto_state{session = Session}) ->
Session. Session.
@ -220,12 +214,14 @@ process(?CONNECT_PACKET(Var), State0) ->
%% Start session %% Start session
case emqx_sm:open_session(#{clean_start => CleanSess, case emqx_sm:open_session(#{clean_start => CleanSess,
client_id => clientid(State2), client_id => clientid(State2),
username => Username}) of username => Username,
client_pid => self()}) of
{ok, Session} -> %% TODO:... {ok, Session} -> %% TODO:...
SP = true, %% TODO:... SP = true, %% TODO:...
%% Register the client %% TODO: Register the client
emqx_cm:reg(client(State2)), emqx_cm:reg(clientid(State2)),
%%emqx_cm:reg(client(State2)),
%% Start keepalive %% Start keepalive
start_keepalive(KeepAlive, State2), start_keepalive(KeepAlive, State2),
%% Emit Stats %% Emit Stats
@ -245,7 +241,7 @@ process(?CONNECT_PACKET(Var), State0) ->
%% Run hooks %% Run hooks
emqx_hooks:run('client.connected', [ReturnCode1], client(State3)), emqx_hooks:run('client.connected', [ReturnCode1], client(State3)),
%%TODO: Send Connack %%TODO: Send Connack
%% send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3), send(?CONNACK_PACKET(ReturnCode1, sp(SessPresent)), State3),
%% stop if authentication failure %% stop if authentication failure
stop_if_auth_failure(ReturnCode1, State3); stop_if_auth_failure(ReturnCode1, State3);
@ -330,8 +326,9 @@ publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
username = Username, username = Username,
mountpoint = MountPoint, mountpoint = MountPoint,
session = Session}) -> session = Session}) ->
Msg = emqx_message:from_packet(Username, ClientId, Packet), Msg = emqx_packet:to_message(Packet),
emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)); Msg1 = Msg#message{from = #client{id = ClientId, username = Username}},
emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1));
publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) -> publish(Packet = ?PUBLISH_PACKET(?QOS_1, _PacketId), State) ->
with_puback(?PUBACK, Packet, State); with_puback(?PUBACK, Packet, State);
@ -344,8 +341,10 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
username = Username, username = Username,
mountpoint = MountPoint, mountpoint = MountPoint,
session = Session}) -> session = Session}) ->
Msg = emqx_message:from_packet(Username, ClientId, Packet), %% TODO: ...
case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg)) of Msg = emqx_packet:to_message(Packet),
Msg1 = Msg#message{from = #client{id = ClientId, username = Username}},
case emqx_session:publish(Session, mount(replvar(MountPoint, State), Msg1)) of
ok -> ok ->
send(?PUBACK_PACKET(Type, PacketId), State); send(?PUBACK_PACKET(Type, PacketId), State);
{error, Error} -> {error, Error} ->
@ -359,7 +358,7 @@ send(Msg, State = #proto_state{client_id = ClientId,
is_bridge = IsBridge}) is_bridge = IsBridge})
when is_record(Msg, message) -> when is_record(Msg, message) ->
emqx_hooks:run('message.delivered', [ClientId, Username], Msg), emqx_hooks:run('message.delivered', [ClientId, Username], Msg),
send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(emqx_packet:from_message(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State), trace(send, Packet, State),
@ -421,7 +420,7 @@ shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
ok. ok.
willmsg(Packet, State = #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) -> willmsg(Packet, State = #proto_state{mountpoint = MountPoint}) when is_record(Packet, mqtt_packet_connect) ->
case emqx_message:from_packet(Packet) of case emqx_packet:to_message(Packet) of
undefined -> undefined; undefined -> undefined;
Msg -> mount(replvar(MountPoint, State), Msg) Msg -> mount(replvar(MountPoint, State), Msg)
end. end.
@ -438,8 +437,8 @@ maybe_set_clientid(State) ->
send_willmsg(_Client, undefined) -> send_willmsg(_Client, undefined) ->
ignore; ignore;
send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> send_willmsg(Client, WillMsg) ->
emqx_broker:publish(WillMsg#message{from = {ClientId, Username}}). emqx_broker:publish(WillMsg#message{from = Client}).
start_keepalive(0, _State) -> ignore; start_keepalive(0, _State) -> ignore;
@ -507,12 +506,13 @@ validate_packet(_Packet) ->
validate_topics(_Type, []) -> validate_topics(_Type, []) ->
{error, empty_topics}; {error, empty_topics};
validate_topics(Type, TopicTable = [{_Topic, _Qos}|_]) validate_topics(Type, TopicTable = [{_Topic, _SubOpts}|_])
when Type =:= name orelse Type =:= filter -> when Type =:= name orelse Type =:= filter ->
Valid = fun(Topic, Qos) -> Valid = fun(Topic, Qos) ->
emqx_topic:validate({Type, Topic}) and validate_qos(Qos) emqx_topic:validate({Type, Topic}) and validate_qos(Qos)
end, end,
case [Topic || {Topic, Qos} <- TopicTable, not Valid(Topic, Qos)] of case [Topic || {Topic, SubOpts} <- TopicTable,
not Valid(Topic, proplists:get_value(qos, SubOpts))] of
[] -> ok; [] -> ok;
_ -> {error, badtopic} _ -> {error, badtopic}
end; end;
@ -531,9 +531,10 @@ validate_qos(_) ->
false. false.
parse_topic_table(TopicTable) -> parse_topic_table(TopicTable) ->
lists:map(fun({Topic0, Qos}) -> lists:map(fun({Topic0, SubOpts}) ->
{Topic, Opts} = emqx_topic:parse(Topic0), {Topic, Opts} = emqx_topic:parse(Topic0),
{Topic, [{qos, Qos}|Opts]} %%TODO:
{Topic, lists:usort(lists:umerge(Opts, SubOpts))}
end, TopicTable). end, TopicTable).
parse_topics(Topics) -> parse_topics(Topics) ->
@ -570,10 +571,10 @@ sp(false) -> 0.
%% The retained flag should be propagated for bridge. %% The retained flag should be propagated for bridge.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
clean_retain(false, Msg = #message{retain = true, headers = Headers}) -> clean_retain(false, Msg = #message{flags = #{retain := true}, headers = Headers}) ->
case lists:member(retained, Headers) of case lists:member(retained, Headers) of
true -> Msg; true -> Msg;
false -> Msg#message{retain = false} false -> emqx_message:set_flag(retain, false, Msg)
end; end;
clean_retain(_IsBridge, Msg) -> clean_retain(_IsBridge, Msg) ->
Msg. Msg.

View File

@ -16,9 +16,12 @@
-module(emqx_rpc). -module(emqx_rpc).
-export([cast/4]). -export([call/4, cast/4]).
call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args).
cast(Node, Mod, Fun, Args) -> cast(Node, Mod, Fun, Args) ->
%%TODO: not right
emqx_metrics:inc('messages/forward'), emqx_metrics:inc('messages/forward'),
rpc:cast(Node, Mod, Fun, Args). rpc:cast(Node, Mod, Fun, Args).

View File

@ -90,6 +90,7 @@ serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId, serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
properties = Properties, properties = Properties,
reason_codes = ReasonCodes}, undefined) -> reason_codes = ReasonCodes}, undefined) ->
io:format("SubAck ReasonCodes: ~p~n", [ReasonCodes]),
{<<PacketId:16/big, (serialize_properties(Properties))/binary>>, << <<Code>> || Code <- ReasonCodes >>}; {<<PacketId:16/big, (serialize_properties(Properties))/binary>>, << <<Code>> || Code <- ReasonCodes >>};
serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId, serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId,

View File

@ -29,7 +29,7 @@
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% Session API %% Session API
-export([start_link/3, resume/3, destroy/2]). -export([start_link/1, resume/3, discard/2]).
%% Management and Monitor API %% Management and Monitor API
-export([state/1, info/1, stats/1]). -export([state/1, info/1, stats/1]).
@ -42,9 +42,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]). terminate/2, code_change/3]).
%% TODO: gen_server Message Priorities
-export([handle_pre_hibernate/1]).
-define(MQueue, emqx_mqueue). -define(MQueue, emqx_mqueue).
%% A stateful interaction between a Client and a Server. Some Sessions %% A stateful interaction between a Client and a Server. Some Sessions
@ -71,8 +68,8 @@
%% will be deleted. %% will be deleted.
-record(state, -record(state,
{ {
%% Clean Session Flag %% Clean Start Flag
clean_sess = false :: boolean(), clean_start = false :: boolean(),
%% Client Binding: local | remote %% Client Binding: local | remote
binding = local :: local | remote, binding = local :: local | remote,
@ -150,9 +147,9 @@
-define(TIMEOUT, 60000). -define(TIMEOUT, 60000).
-define(INFO_KEYS, [clean_sess, client_id, username, client_pid, binding, created_at]). -define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]).
-define(STATE_KEYS, [clean_sess, client_id, username, binding, client_pid, old_client_pid, -define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
await_rel_timeout, expiry_interval, enable_stats, force_gc_count, await_rel_timeout, expiry_interval, enable_stats, force_gc_count,
@ -163,10 +160,9 @@
"Session(~s): " ++ Format, [State#state.client_id | Args])). "Session(~s): " ++ Format, [State#state.client_id | Args])).
%% @doc Start a Session %% @doc Start a Session
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, term()}). -spec(start_link(map()) -> {ok, pid()} | {error, term()}).
start_link(CleanSess, {ClientId, Username}, ClientPid) -> start_link(ClientAttrs) ->
gen_server:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], gen_server:start_link(?MODULE, ClientAttrs, [{hibernate_after, 10000}]).
[{hibernate_after, 10000}]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% PubSub API %% PubSub API
@ -215,12 +211,12 @@ pubcomp(Session, PacketId) ->
gen_server:cast(Session, {pubcomp, PacketId}). gen_server:cast(Session, {pubcomp, PacketId}).
%% @doc Unsubscribe the topics %% @doc Unsubscribe the topics
-spec(unsubscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). -spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok).
unsubscribe(Session, TopicTable) -> unsubscribe(Session, TopicTable) ->
gen_server:cast(Session, {unsubscribe, self(), TopicTable}). gen_server:cast(Session, {unsubscribe, self(), TopicTable}).
%% @doc Resume the session %% @doc Resume the session
-spec(resume(pid(), mqtt_client_id(), pid()) -> ok). -spec(resume(pid(), client_id(), pid()) -> ok).
resume(Session, ClientId, ClientPid) -> resume(Session, ClientId, ClientPid) ->
gen_server:cast(Session, {resume, ClientId, ClientPid}). gen_server:cast(Session, {resume, ClientId, ClientPid}).
@ -260,16 +256,19 @@ stats(#state{max_subscriptions = MaxSubscriptions,
{deliver_msg, get(deliver_msg)}, {deliver_msg, get(deliver_msg)},
{enqueue_msg, get(enqueue_msg)}]). {enqueue_msg, get(enqueue_msg)}]).
%% @doc Destroy the session %% @doc Discard the session
-spec(destroy(pid(), mqtt_client_id()) -> ok). -spec(discard(pid(), client_id()) -> ok).
destroy(Session, ClientId) -> discard(Session, ClientId) ->
gen_server:cast(Session, {destroy, ClientId}). gen_server:cast(Session, {discard, ClientId}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([CleanSess, {ClientId, Username}, ClientPid]) -> init(#{clean_start := CleanStart,
client_id := ClientId,
username := Username,
client_pid := ClientPid}) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = link(ClientPid), true = link(ClientPid),
init_stats([deliver_msg, enqueue_msg]), init_stats([deliver_msg, enqueue_msg]),
@ -280,7 +279,7 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
ForceGcCount = emqx_gc:conn_max_gc_count(), ForceGcCount = emqx_gc:conn_max_gc_count(),
IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false), IgnoreLoopDeliver = get_value(ignore_loop_deliver, Env, false),
MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()), MQueue = ?MQueue:new(ClientId, QEnv, emqx_alarm:alarm_fun()),
State = #state{clean_sess = CleanSess, State = #state{clean_start = CleanStart,
binding = binding(ClientPid), binding = binding(ClientPid),
client_id = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
@ -300,8 +299,9 @@ init([CleanSess, {ClientId, Username}, ClientPid]) ->
force_gc_count = ForceGcCount, force_gc_count = ForceGcCount,
ignore_loop_deliver = IgnoreLoopDeliver, ignore_loop_deliver = IgnoreLoopDeliver,
created_at = os:timestamp()}, created_at = os:timestamp()},
emqx_sm:register_session(ClientId, CleanSess, info(State)), %%emqx_sm:register_session(ClientId, info(State)),
emqx_hooks:run('session.created', [ClientId, Username]), emqx_hooks:run('session.created', [ClientId, Username]),
io:format("Session started: ~p~n", [self()]),
{ok, emit_stats(State), hibernate}. {ok, emit_stats(State), hibernate}.
init_stats(Keys) -> init_stats(Keys) ->
@ -313,7 +313,7 @@ binding(ClientPid) ->
handle_pre_hibernate(State) -> handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call({publish, Msg = #message{qos = ?QOS_2, packet_id = PacketId}}, _From, handle_call({publish, Msg = #message{qos = ?QOS_2, headers = #{packet_id := PacketId}}}, _From,
State = #state{awaiting_rel = AwaitingRel, State = #state{awaiting_rel = AwaitingRel,
await_rel_timer = Timer, await_rel_timer = Timer,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
@ -350,6 +350,7 @@ handle_cast({subscribe, From, TopicTable, AckFun},
?LOG(info, "Subscribe ~p", [TopicTable], State), ?LOG(info, "Subscribe ~p", [TopicTable], State),
{GrantedQos, Subscriptions1} = {GrantedQos, Subscriptions1} =
lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) ->
io:format("SubOpts: ~p~n", [Opts]),
Fastlane = lists:member(fastlane, Opts), Fastlane = lists:member(fastlane, Opts),
NewQos = if Fastlane == true -> ?QOS_0; true -> get_value(qos, Opts) end, NewQos = if Fastlane == true -> ?QOS_0; true -> get_value(qos, Opts) end,
SubMap1 = SubMap1 =
@ -373,6 +374,7 @@ handle_cast({subscribe, From, TopicTable, AckFun},
end, end,
{[NewQos|QosAcc], SubMap1} {[NewQos|QosAcc], SubMap1}
end, {[], Subscriptions}, TopicTable), end, {[], Subscriptions}, TopicTable),
io:format("GrantedQos: ~p~n", [GrantedQos]),
AckFun(lists:reverse(GrantedQos)), AckFun(lists:reverse(GrantedQos)),
{noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate};
@ -456,7 +458,7 @@ handle_cast({pubcomp, PacketId}, State = #state{inflight = Inflight}) ->
handle_cast({resume, ClientId, ClientPid}, handle_cast({resume, ClientId, ClientPid},
State = #state{client_id = ClientId, State = #state{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
clean_sess = CleanSess, clean_start = CleanStart,
retry_timer = RetryTimer, retry_timer = RetryTimer,
await_rel_timer = AwaitTimer, await_rel_timer = AwaitTimer,
expiry_timer = ExpireTimer}) -> expiry_timer = ExpireTimer}) ->
@ -477,7 +479,7 @@ handle_cast({resume, ClientId, ClientPid},
State1 = State#state{client_pid = ClientPid, State1 = State#state{client_pid = ClientPid,
binding = binding(ClientPid), binding = binding(ClientPid),
old_client_pid = OldClientPid, old_client_pid = OldClientPid,
clean_sess = false, clean_start = false,
retry_timer = undefined, retry_timer = undefined,
awaiting_rel = #{}, awaiting_rel = #{},
await_rel_timer = undefined, await_rel_timer = undefined,
@ -485,22 +487,23 @@ handle_cast({resume, ClientId, ClientPid},
%% Clean Session: true -> false? %% Clean Session: true -> false?
if if
CleanSess =:= true -> CleanStart =:= true ->
?LOG(error, "CleanSess changed to false.", [], State1), ?LOG(error, "CleanSess changed to false.", [], State1);
emqx_sm:register_session(ClientId, false, info(State1)); %%TODO::
CleanSess =:= false -> %%emqx_sm:register_session(ClientId, info(State1));
CleanStart =:= false ->
ok ok
end, end,
%% Replay delivery and Dequeue pending messages %% Replay delivery and Dequeue pending messages
{noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; {noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
handle_cast({destroy, ClientId}, handle_cast({discard, ClientId},
State = #state{client_id = ClientId, client_pid = undefined}) -> State = #state{client_id = ClientId, client_pid = undefined}) ->
?LOG(warning, "Destroyed", [], State), ?LOG(warning, "Destroyed", [], State),
shutdown(destroy, State); shutdown(discard, State);
handle_cast({destroy, ClientId}, handle_cast({discard, ClientId},
State = #state{client_id = ClientId, client_pid = OldClientPid}) -> State = #state{client_id = ClientId, client_pid = OldClientPid}) ->
?LOG(warning, "kickout ~p", [OldClientPid], State), ?LOG(warning, "kickout ~p", [OldClientPid], State),
shutdown(conflict, State); shutdown(conflict, State);
@ -533,11 +536,11 @@ handle_info({timeout, _Timer, expired}, State) ->
shutdown(expired, State); shutdown(expired, State);
handle_info({'EXIT', ClientPid, _Reason}, handle_info({'EXIT', ClientPid, _Reason},
State = #state{clean_sess = true, client_pid = ClientPid}) -> State = #state{clean_start= true, client_pid = ClientPid}) ->
{stop, normal, State}; {stop, normal, State};
handle_info({'EXIT', ClientPid, Reason}, handle_info({'EXIT', ClientPid, Reason},
State = #state{clean_sess = false, State = #state{clean_start = false,
client_pid = ClientPid, client_pid = ClientPid,
expiry_interval = Interval}) -> expiry_interval = Interval}) ->
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State), ?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], State),
@ -604,7 +607,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
if if
Force orelse (Diff >= Interval) -> Force orelse (Diff >= Interval) ->
case {Type, Msg} of case {Type, Msg} of
{publish, Msg = #message{packet_id = PacketId}} -> {publish, Msg = #message{headers = #{packet_id := PacketId}}} ->
redeliver(Msg, State), redeliver(Msg, State),
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
@ -687,7 +690,7 @@ dispatch(Msg = #message{qos = QoS},
true -> true ->
enqueue_msg(Msg, State); enqueue_msg(Msg, State);
false -> false ->
Msg1 = Msg#message{packet_id = MsgId}, Msg1 = emqx_message:set_header(packet_id, MsgId, Msg),
deliver(Msg1, State), deliver(Msg1, State),
await(Msg1, next_msg_id(State)) await(Msg1, next_msg_id(State))
end. end.
@ -701,7 +704,7 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
redeliver(Msg = #message{qos = QoS}, State) -> redeliver(Msg = #message{qos = QoS}, State) ->
deliver(Msg#message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); deliver(if QoS =:= ?QOS2 -> Msg; true -> emqx_message:set_flag(dup, Msg) end, State);
redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
Pid ! {redeliver, {?PUBREL, PacketId}}. Pid ! {redeliver, {?PUBREL, PacketId}}.
@ -715,7 +718,7 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) ->
%% Awaiting ACK for QoS1/QoS2 Messages %% Awaiting ACK for QoS1/QoS2 Messages
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
await(Msg = #message{packet_id = PacketId}, await(Msg = #message{headers = #{packet_id := PacketId}},
State = #state{inflight = Inflight, State = #state{inflight = Inflight,
retry_timer = RetryTimer, retry_timer = RetryTimer,
retry_interval = Interval}) -> retry_interval = Interval}) ->
@ -797,9 +800,8 @@ tune_qos(Topic, Msg = #message{qos = PubQoS},
%% Reset Dup %% Reset Dup
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
reset_dup(Msg = #message{dup = true}) -> reset_dup(Msg) ->
Msg#message{dup = false}; emqx_message:unset_flag(dup, Msg).
reset_dup(Msg) -> Msg.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Next Msg Id %% Next Msg Id

View File

@ -24,7 +24,7 @@
-export([open_session/1, lookup_session/1, close_session/1]). -export([open_session/1, lookup_session/1, close_session/1]).
-export([resume_session/1, discard_session/1]). -export([resume_session/1, discard_session/1]).
-export([register_session/1, unregister_session/2]). -export([register_session/1, unregister_session/1, unregister_session/2]).
%% lock_session/1, create_session/1, unlock_session/1, %% lock_session/1, create_session/1, unlock_session/1,
@ -42,17 +42,19 @@ start_link(StatsFun) ->
open_session(Session = #{client_id := ClientId, clean_start := true}) -> open_session(Session = #{client_id := ClientId, clean_start := true}) ->
with_lock(ClientId, with_lock(ClientId,
fun() -> fun() ->
case rpc:multicall(ekka:nodelist(), ?MODULE, discard_session, [ClientId]) of io:format("Nodelist: ~p~n", [ekka_membership:nodelist()]),
case rpc:multicall(ekka_membership:nodelist(), ?MODULE, discard_session, [ClientId]) of
{_Res, []} -> ok; {_Res, []} -> ok;
{_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes]) {_Res, BadNodes} -> emqx_log:error("[SM] Bad nodes found when lock a session: ~p", [BadNodes])
end, end,
{ok, emqx_session_sup:start_session(Session)} io:format("Begin to start session: ~p~n", [Session]),
emqx_session_sup:start_session(Session)
end); end);
open_session(Session = #{client_id := ClientId, clean_start := false}) -> open_session(Session = #{client_id := ClientId, clean_start := false}) ->
with_lock(ClientId, with_lock(ClientId,
fun() -> fun() ->
{ResL, _BadNodes} = emqx_rpc:multicall(ekka:nodelist(), ?MODULE, lookup_session, [ClientId]), {ResL, _BadNodes} = rpc:multicall(ekka_membership:nodelist(), ?MODULE, lookup_session, [ClientId]),
case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of case lists:flatten([Pid || Pid <- ResL, Pid =/= undefined]) of
[] -> [] ->
{ok, emqx_session_sup:start_session(Session)}; {ok, emqx_session_sup:start_session(Session)};
@ -61,7 +63,7 @@ open_session(Session = #{client_id := ClientId, clean_start := false}) ->
ok -> {ok, SessPid}; ok -> {ok, SessPid};
{error, Reason} -> {error, Reason} ->
emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]), emqx_log:error("[SM] Failed to resume session: ~p, ~p", [Session, Reason]),
{ok, emqx_session_sup:start_session(Session)} emqx_session_sup:start_session(Session)
end end
end end
end). end).
@ -109,6 +111,9 @@ with_lock(ClientId, Fun) ->
register_session(ClientId) -> register_session(ClientId) ->
ets:insert(session, {ClientId, self()}). ets:insert(session, {ClientId, self()}).
unregister_session(ClientId) ->
unregister_session(ClientId, self()).
unregister_session(ClientId, Pid) -> unregister_session(ClientId, Pid) ->
case ets:lookup(session, ClientId) of case ets:lookup(session, ClientId) of
[{_, Pid}] -> [{_, Pid}] ->

View File

@ -24,10 +24,10 @@
%% @doc Lock a clientid %% @doc Lock a clientid
-spec(lock(client_id()) -> boolean() | {error, term()}). -spec(lock(client_id()) -> boolean() | {error, term()}).
lock(ClientId) -> lock(ClientId) ->
emqx_rpc:call(ekka:leader(), emqx_sm_locker, lock, [ClientId]). rpc:call(ekka_membership:leader(), emqx_locker, lock, [ClientId]).
%% @doc Unlock a clientid %% @doc Unlock a clientid
-spec(unlock(client_id()) -> ok). -spec(unlock(client_id()) -> ok).
unlock(ClientId) -> unlock(ClientId) ->
emqx_rpc:call(ekka:leader(), emqx_locker, unlock, [ClientId]). rpc:call(ekka_membership:leader(), emqx_locker, unlock, [ClientId]).

View File

@ -26,20 +26,15 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
%% Create tables lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]),
create_tabs(),
StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'),
SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]}, SM = {emqx_sm, {emqx_sm, start_link, [StatsFun]},
permanent, 5000, worker, [emqx_sm]}, permanent, 5000, worker, [emqx_sm]},
{ok, {{one_for_all, 0, 3600}, [SM]}}. {ok, {{one_for_all, 10, 3600}, [SM]}}.
create_tabs() ->
lists:foreach(fun create_tab/1, [session, session_stats, session_attrs]).
create_tab(Tab) -> create_tab(Tab) ->
emqx_tables:create(Tab, [public, ordered_set, named_table, emqx_tables:create(Tab, [public, ordered_set, named_table, {write_concurrency, true}]).
{write_concurrency, true}]).

View File

@ -51,7 +51,8 @@ start_link() ->
trace(publish, From, _Msg) when is_atom(From) -> trace(publish, From, _Msg) when is_atom(From) ->
%% Dont' trace '$SYS' publish %% Dont' trace '$SYS' publish
ignore; ignore;
trace(publish, {ClientId, Username}, #message{topic = Topic, payload = Payload}) -> trace(publish, #client{id = ClientId, username = Username},
#message{topic = Topic, payload = Payload}) ->
lager:info([{client, ClientId}, {topic, Topic}], lager:info([{client, ClientId}, {topic, Topic}],
"~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]); "~s/~s PUBLISH to ~s: ~p", [ClientId, Username, Topic, Payload]);
trace(publish, From, #message{topic = Topic, payload = Payload}) trace(publish, From, #message{topic = Topic, payload = Payload})
@ -59,7 +60,6 @@ trace(publish, From, #message{topic = Topic, payload = Payload})
lager:info([{client, From}, {topic, Topic}], lager:info([{client, From}, {topic, Topic}],
"~s PUBLISH to ~s: ~p", [From, Topic, Payload]). "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Start/Stop Trace %% Start/Stop Trace
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------