client_id

This commit is contained in:
Feng 2015-07-02 23:22:27 +08:00
parent 7009ef944b
commit 87dafdd7b2
15 changed files with 93 additions and 90 deletions

View File

@ -83,12 +83,11 @@
%% MQTT Client %% MQTT Client
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_client, { -record(mqtt_client, {
clientid :: binary() | undefined, client_id :: binary() | undefined,
username :: binary() | undefined, username :: binary() | undefined,
ipaddress :: inet:ip_address(), ipaddress :: inet:ip_address(),
client_pid :: pid(),
client_mon :: reference(),
clean_sess :: boolean(), clean_sess :: boolean(),
client_pid :: pid(),
proto_ver :: 3 | 4 proto_ver :: 3 | 4
}). }).
@ -98,7 +97,7 @@
%% MQTT Session %% MQTT Session
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-record(mqtt_session, { -record(mqtt_session, {
clientid, client_id,
session_pid, session_pid,
subscriptions = [] subscriptions = []
}). }).
@ -108,16 +107,16 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% MQTT Message %% MQTT Message
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-type mqtt_msgid() :: undefined | 1..16#ffff. -type mqtt_msgid() :: binary().
-record(mqtt_message, { -record(mqtt_message, {
msgid :: mqtt_msgid(), %% Unique Message ID
topic :: binary(), %% Topic that the message is published to topic :: binary(), %% Topic that the message is published to
from :: binary() | atom(), %% ClientId of publisher from :: binary() | atom(), %% ClientId of publisher
qos = 0 :: 0 | 1 | 2, %% Message QoS qos = 0 :: 0 | 1 | 2, %% Message QoS
retain = false :: boolean(), %% Retain flag retain = false :: boolean(), %% Retain flag
dup = false :: boolean(), %% Dup flag dup = false :: boolean(), %% Dup flag
sys = false :: boolean(), %% $SYS flag sys = false :: boolean(), %% $SYS flag
msgid :: mqtt_msgid(), %% Message ID
payload :: binary(), %% Payload payload :: binary(), %% Payload
timestamp :: erlang:timestamp() %% Timestamp timestamp :: erlang:timestamp() %% Timestamp
}). }).

View File

@ -122,11 +122,11 @@
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% MQTT Packets %% MQTT Packets
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-type mqtt_clientid() :: binary(). -type mqtt_client_id() :: binary().
-type mqtt_packet_id() :: 1..16#ffff | undefined. -type mqtt_packet_id() :: 1..16#ffff | undefined.
-record(mqtt_packet_connect, { -record(mqtt_packet_connect, {
clientid = <<>> :: mqtt_clientid(), client_id = <<>> :: mqtt_client_id(),
proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(),
proto_name = <<"MQTT">> :: binary(), proto_name = <<"MQTT">> :: binary(),
will_retain = false :: boolean(), will_retain = false :: boolean(),

View File

@ -97,7 +97,7 @@ check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subsc
[] -> allow; [] -> allow;
AclMods -> check_acl(Client, PubSub, Topic, AclMods) AclMods -> check_acl(Client, PubSub, Topic, AclMods)
end. end.
check_acl(#mqtt_client{clientid = ClientId}, PubSub, Topic, []) -> check_acl(#mqtt_client{client_id = ClientId}, PubSub, Topic, []) ->
lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]), lager:error("ACL: nomatch when ~s ~s ~s", [ClientId, PubSub, Topic]),
allow; allow;
check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) -> check_acl(Client, PubSub, Topic, [{M, State}|AclMods]) ->

View File

@ -110,7 +110,7 @@ match_who(_Client, {user, all}) ->
true; true;
match_who(_Client, {client, all}) -> match_who(_Client, {client, all}) ->
true; true;
match_who(#mqtt_client{clientid = ClientId}, {client, ClientId}) -> match_who(#mqtt_client{client_id = ClientId}, {client, ClientId}) ->
true; true;
match_who(#mqtt_client{username = Username}, {user, Username}) -> match_who(#mqtt_client{username = Username}, {user, Username}) ->
true; true;
@ -145,9 +145,9 @@ feed_var(Client, Pattern) ->
feed_var(Client, Pattern, []). feed_var(Client, Pattern, []).
feed_var(_Client, [], Acc) -> feed_var(_Client, [], Acc) ->
lists:reverse(Acc); lists:reverse(Acc);
feed_var(Client = #mqtt_client{clientid = undefined}, [<<"$c">>|Words], Acc) -> feed_var(Client = #mqtt_client{client_id = undefined}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [<<"$c">>|Acc]); feed_var(Client, Words, [<<"$c">>|Acc]);
feed_var(Client = #mqtt_client{clientid = ClientId}, [<<"$c">>|Words], Acc) -> feed_var(Client = #mqtt_client{client_id = ClientId}, [<<"$c">>|Words], Acc) ->
feed_var(Client, Words, [ClientId |Acc]); feed_var(Client, Words, [ClientId |Acc]);
feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) -> feed_var(Client = #mqtt_client{username = undefined}, [<<"$u">>|Words], Acc) ->
feed_var(Client, Words, [<<"$u">>|Acc]); feed_var(Client, Words, [<<"$u">>|Acc]);

View File

@ -41,7 +41,7 @@
-define(AUTH_CLIENTID_TAB, mqtt_auth_clientid). -define(AUTH_CLIENTID_TAB, mqtt_auth_clientid).
-record(?AUTH_CLIENTID_TAB, {clientid, ipaddr, password}). -record(?AUTH_CLIENTID_TAB, {client_id, ipaddr, password}).
%%%============================================================================= %%%=============================================================================
%%% API %%% API
@ -52,7 +52,7 @@
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
add_clientid(ClientId) when is_binary(ClientId) -> add_clientid(ClientId) when is_binary(ClientId) ->
R = #mqtt_auth_clientid{clientid = ClientId}, R = #mqtt_auth_clientid{client_id = ClientId},
mnesia:transaction(fun() -> mnesia:write(R) end). mnesia:transaction(fun() -> mnesia:write(R) end).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -60,7 +60,7 @@ add_clientid(ClientId) when is_binary(ClientId) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
add_clientid(ClientId, Password) -> add_clientid(ClientId, Password) ->
R = #mqtt_auth_clientid{clientid = ClientId, password = Password}, R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
mnesia:transaction(fun() -> mnesia:write(R) end). mnesia:transaction(fun() -> mnesia:write(R) end).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -99,15 +99,15 @@ init(Opts) ->
end, end,
{ok, Opts}. {ok, Opts}.
check(#mqtt_client{clientid = undefined}, _Password, []) -> check(#mqtt_client{client_id = undefined}, _Password, []) ->
{error, "ClientId undefined"}; {error, "ClientId undefined"};
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, []) -> check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, []) ->
check_clientid_only(ClientId, IpAddress); check_clientid_only(ClientId, IpAddress);
check(#mqtt_client{clientid = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) -> check(#mqtt_client{client_id = ClientId, ipaddress = IpAddress}, _Password, [{password, no}|_]) ->
check_clientid_only(ClientId, IpAddress); check_clientid_only(ClientId, IpAddress);
check(_Client, undefined, [{password, yes}|_]) -> check(_Client, undefined, [{password, yes}|_]) ->
{error, "Password undefined"}; {error, "Password undefined"};
check(#mqtt_client{clientid = ClientId}, Password, [{password, yes}|_]) -> check(#mqtt_client{client_id = ClientId}, Password, [{password, yes}|_]) ->
case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of case mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId) of
[] -> {error, "ClientId Not Found"}; [] -> {error, "ClientId Not Found"};
[#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext?? [#?AUTH_CLIENTID_TAB{password = Password}] -> ok; %% TODO: plaintext??
@ -129,11 +129,11 @@ load(Fd, {ok, Line}, Clients) when is_list(Line) ->
case string:tokens(Line, " ") of case string:tokens(Line, " ") of
[ClientIdS] -> [ClientIdS] ->
ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)), ClientId = list_to_binary(string:strip(ClientIdS, right, $\n)),
[#mqtt_auth_clientid{clientid = ClientId} | Clients]; [#mqtt_auth_clientid{client_id = ClientId} | Clients];
[ClientId, IpAddr0] -> [ClientId, IpAddr0] ->
IpAddr = string:strip(IpAddr0, right, $\n), IpAddr = string:strip(IpAddr0, right, $\n),
Range = esockd_access:range(IpAddr), Range = esockd_access:range(IpAddr),
[#mqtt_auth_clientid{clientid = list_to_binary(ClientId), [#mqtt_auth_clientid{client_id = list_to_binary(ClientId),
ipaddr = {IpAddr, Range}}|Clients]; ipaddr = {IpAddr, Range}}|Clients];
BadLine -> BadLine ->
lager:error("BadLine in clients.config: ~s", [BadLine]), lager:error("BadLine in clients.config: ~s", [BadLine]),

View File

@ -45,7 +45,7 @@ load(Opts) ->
{?MODULE, client_connected, [Topics]}), {?MODULE, client_connected, [Topics]}),
{ok, #state{topics = Topics}}. {ok, #state{topics = Topics}}.
client_connected(?CONNACK_ACCEPT, #mqtt_client{clientid = ClientId, client_pid = ClientPid}, Topics) -> client_connected(?CONNACK_ACCEPT, #mqtt_client{client_id = ClientId, client_pid = ClientPid}, Topics) ->
F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end, F = fun(Topic) -> emqttd_topic:feed_var(<<"$c">>, ClientId, Topic) end,
ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]}; ClientPid ! {subscribe, [{F(Topic), Qos} || {Topic, Qos} <- Topics]};

View File

@ -39,7 +39,7 @@ load(Opts) ->
emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}), emqttd_broker:hook(client_disconnected, {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
{ok, Opts}. {ok, Opts}.
client_connected(ConnAck, #mqtt_client{clientid = ClientId, client_connected(ConnAck, #mqtt_client{client_id = ClientId,
username = Username, username = Username,
ipaddress = IpAddress, ipaddress = IpAddress,
clean_sess = CleanSess, clean_sess = CleanSess,

View File

@ -96,7 +96,7 @@ format_variable(#mqtt_packet_connect{
will_flag = WillFlag, will_flag = WillFlag,
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = KeepAlive, keep_alive = KeepAlive,
clientid = ClientId, client_id = ClientId,
will_topic = WillTopic, will_topic = WillTopic,
will_msg = WillMsg, will_msg = WillMsg,
username = Username, username = Username,

View File

@ -112,7 +112,7 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
will_flag = bool(WillFlag), will_flag = bool(WillFlag),
clean_sess = bool(CleanSession), clean_sess = bool(CleanSession),
keep_alive = KeepAlive, keep_alive = KeepAlive,
clientid = ClientId, client_id = ClientId,
will_topic = WillTopic, will_topic = WillTopic,
will_msg = WillMsg, will_msg = WillMsg,
username = UserName, username = UserName,

View File

@ -47,7 +47,7 @@
proto_ver, proto_ver,
proto_name, proto_name,
username, username,
clientid, client_id,
clean_sess, clean_sess,
session, session,
will_msg, will_msg,
@ -70,25 +70,25 @@ init(Peername, SendFun, Opts) ->
info(#proto_state{proto_ver = ProtoVer, info(#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
clientid = ClientId, client_id = ClientId,
clean_sess = CleanSess, clean_sess = CleanSess,
will_msg = WillMsg}) -> will_msg = WillMsg}) ->
[{proto_ver, ProtoVer}, [{proto_ver, ProtoVer},
{proto_name, ProtoName}, {proto_name, ProtoName},
{clientid, ClientId}, {client_id, ClientId},
{clean_sess, CleanSess}, {clean_sess, CleanSess},
{will_msg, WillMsg}]. {will_msg, WillMsg}].
clientid(#proto_state{clientid = ClientId}) -> clientid(#proto_state{client_id = ClientId}) ->
ClientId. ClientId.
client(#proto_state{peername = {Addr, _Port}, client(#proto_state{peername = {Addr, _Port},
clientid = ClientId, client_id = ClientId,
username = Username, username = Username,
clean_sess = CleanSess, clean_sess = CleanSess,
proto_ver = ProtoVer, proto_ver = ProtoVer,
client_pid = Pid}) -> client_pid = Pid}) ->
#mqtt_client{clientid = ClientId, #mqtt_client{client_id = ClientId,
username = Username, username = Username,
ipaddress = Addr, ipaddress = Addr,
clean_sess = CleanSess, clean_sess = CleanSess,
@ -126,12 +126,12 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
password = Password, password = Password,
clean_sess = CleanSess, clean_sess = CleanSess,
keep_alive = KeepAlive, keep_alive = KeepAlive,
clientid = ClientId} = Var, client_id = ClientId} = Var,
State1 = State0#proto_state{proto_ver = ProtoVer, State1 = State0#proto_state{proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
username = Username, username = Username,
clientid = ClientId, client_id = ClientId,
clean_sess = CleanSess}, clean_sess = CleanSess},
trace(recv, Packet, State1), trace(recv, Packet, State1),
@ -142,7 +142,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
case emqttd_access_control:auth(client(State1), Password) of case emqttd_access_control:auth(client(State1), Password) of
ok -> ok ->
%% Generate clientId if null %% Generate clientId if null
State2 = State1#proto_state{clientid = clientid(ClientId, State1)}, State2 = State1#proto_state{client_id = clientid(ClientId, State1)},
%%Starting session %%Starting session
{ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)), {ok, Session} = emqttd_sm:start_session(CleanSess, clientid(State2)),
@ -167,7 +167,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
send(?CONNACK_PACKET(ReturnCode1), State3); send(?CONNACK_PACKET(ReturnCode1), State3);
handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
State = #proto_state{clientid = ClientId}) -> State = #proto_state{client_id = ClientId}) ->
case check_acl(publish, Topic, State) of case check_acl(publish, Topic, State) of
allow -> allow ->
@ -199,7 +199,7 @@ handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Sessio
handle(?SUBSCRIBE_PACKET(PacketId, []), State) -> handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
send(?SUBACK_PACKET(PacketId, []), State); send(?SUBACK_PACKET(PacketId, []), State);
handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{clientid = ClientId, session = Session}) -> handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) ->
AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable], AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
case lists:member(deny, AllowDenies) of case lists:member(deny, AllowDenies) of
true -> true ->
@ -233,10 +233,10 @@ handle(?PACKET(?DISCONNECT), State) ->
% clean willmsg % clean willmsg
{stop, normal, State#proto_state{will_msg = undefined}}. {stop, normal, State#proto_state{will_msg = undefined}}.
publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{clientid = ClientId, session = Session}) -> publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) ->
emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)); emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
ok -> ok ->
send(?PUBACK_PACKET(?PUBACK, PacketId), State); send(?PUBACK_PACKET(?PUBACK, PacketId), State);
@ -245,7 +245,7 @@ publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = Cli
lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error]) lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error])
end; end;
publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{clientid = ClientId, session = Session}) -> publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
ok -> ok ->
send(?PUBACK_PACKET(?PUBREC, PacketId), State); send(?PUBACK_PACKET(?PUBREC, PacketId), State);
@ -267,11 +267,11 @@ send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername}) when
SendFun(Data), SendFun(Data),
{ok, State}. {ok, State}.
trace(recv, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> trace(recv, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
lager:info([{client, ClientId}], "RECV from ~s@~s: ~s", lager:info([{client, ClientId}], "RECV from ~s@~s: ~s",
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]); [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]);
trace(send, Packet, #proto_state{peername = Peername, clientid = ClientId}) -> trace(send, Packet, #proto_state{peername = Peername, client_id = ClientId}) ->
lager:info([{client, ClientId}], "SEND to ~s@~s: ~s", lager:info([{client, ClientId}], "SEND to ~s@~s: ~s",
[ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]). [ClientId, emqttd_net:format(Peername), emqttd_packet:format(Packet)]).
@ -282,10 +282,10 @@ redeliver({?PUBREL, PacketId}, State) ->
shutdown(duplicate_id, _State) -> shutdown(duplicate_id, _State) ->
quiet; %% quiet; %%
shutdown(_, #proto_state{clientid = undefined}) -> shutdown(_, #proto_state{client_id = undefined}) ->
ignore; ignore;
shutdown(Error, #proto_state{peername = Peername, clientid = ClientId, will_msg = WillMsg}) -> shutdown(Error, #proto_state{peername = Peername, client_id = ClientId, will_msg = WillMsg}) ->
lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p", lager:info([{client, ClientId}], "Client ~s@~s: shutdown ~p",
[ClientId, emqttd_net:format(Peername), Error]), [ClientId, emqttd_net:format(Peername), Error]),
send_willmsg(ClientId, WillMsg), send_willmsg(ClientId, WillMsg),
@ -333,16 +333,16 @@ validate_connect(Connect = #mqtt_packet_connect{}, ProtoState) ->
validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) -> validate_protocol(#mqtt_packet_connect{proto_ver = Ver, proto_name = Name}) ->
lists:member({Ver, Name}, ?PROTOCOL_NAMES). lists:member({Ver, Name}, ?PROTOCOL_NAMES).
validate_clientid(#mqtt_packet_connect{clientid = ClientId}, #proto_state{max_clientid_len = MaxLen}) validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_clientid_len = MaxLen})
when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) -> when ( size(ClientId) >= 1 ) andalso ( size(ClientId) =< MaxLen ) ->
true; true;
%% MQTT3.1.1 allow null clientId. %% MQTT3.1.1 allow null clientId.
validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, clientid = ClientId}, _ProtoState) validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState)
when size(ClientId) =:= 0 -> when size(ClientId) =:= 0 ->
true; true;
validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, clientid = ClientId}, _ProtoState) -> validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) ->
lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]), lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
false. false.

View File

@ -60,7 +60,7 @@ serialise_header(#mqtt_packet_header{type = Type,
VariableBin/binary, VariableBin/binary,
PayloadBin/binary>>. PayloadBin/binary>>.
serialise_variable(?CONNECT, #mqtt_packet_connect{clientid = ClientId, serialise_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
proto_ver = ProtoVer, proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
will_retain = WillRetain, will_retain = WillRetain,

View File

@ -73,7 +73,7 @@
clean_sess = true, clean_sess = true,
%% ClientId: Identifier of Session %% ClientId: Identifier of Session
clientid :: binary(), client_id :: binary(),
%% Client Pid linked with session %% Client Pid linked with session
client_pid :: pid(), client_pid :: pid(),
@ -133,7 +133,7 @@
%% @doc Start a session. %% @doc Start a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(boolean(), mqtt_clientid(), pid()) -> {ok, pid()} | {error, any()}. -spec start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}.
start_link(CleanSess, ClientId, ClientPid) -> start_link(CleanSess, ClientId, ClientPid) ->
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
@ -141,7 +141,7 @@ start_link(CleanSess, ClientId, ClientPid) ->
%% @doc Resume a session. %% @doc Resume a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec resume(pid(), mqtt_clientid(), pid()) -> ok. -spec resume(pid(), mqtt_client_id(), pid()) -> ok.
resume(Session, ClientId, ClientPid) -> resume(Session, ClientId, ClientPid) ->
gen_server:cast(Session, {resume, ClientId, ClientPid}). gen_server:cast(Session, {resume, ClientId, ClientPid}).
@ -149,7 +149,7 @@ resume(Session, ClientId, ClientPid) ->
%% @doc Destroy a session. %% @doc Destroy a session.
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec destroy(pid(), mqtt_clientid()) -> ok. -spec destroy(pid(), mqtt_client_id()) -> ok.
destroy(Session, ClientId) -> destroy(Session, ClientId) ->
gen_server:call(Session, {destroy, ClientId}). gen_server:call(Session, {destroy, ClientId}).
@ -217,7 +217,7 @@ init([CleanSess, ClientId, ClientPid]) ->
SessEnv = emqttd:env(mqtt, session), SessEnv = emqttd:env(mqtt, session),
Session = #session{ Session = #session{
clean_sess = CleanSess, clean_sess = CleanSess,
clientid = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
subscriptions = [], subscriptions = [],
inflight_queue = [], inflight_queue = [],
@ -234,7 +234,7 @@ init([CleanSess, ClientId, ClientPid]) ->
timestamp = os:timestamp()}, timestamp = os:timestamp()},
{ok, Session, hibernate}. {ok, Session, hibernate}.
handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, handle_call({subscribe, Topics}, _From, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
%% subscribe first and don't care if the subscriptions have been existed %% subscribe first and don't care if the subscriptions have been existed
@ -264,7 +264,7 @@ handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
end, Subscriptions, Topics), end, Subscriptions, Topics),
{reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}}; {reply, {ok, GrantedQos}, Session#session{subscriptions = Subscriptions1}};
handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId, handle_call({unsubscribe, Topics}, _From, Session = #session{client_id = ClientId,
subscriptions = Subscriptions}) -> subscriptions = Subscriptions}) ->
%% unsubscribe from topic tree %% unsubscribe from topic tree
@ -285,7 +285,7 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId
{reply, ok, Session#session{subscriptions = Subscriptions1}}; {reply, ok, Session#session{subscriptions = Subscriptions1}};
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From, handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
Session = #session{clientid = ClientId, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel, awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
case check_awaiting_rel(Session) of case check_awaiting_rel(Session) of
@ -294,12 +294,12 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel), AwaitingRel1 = maps:put(MsgId, {Msg, TRef}, AwaitingRel),
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}}; {reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
false -> false ->
lager:critical([{clientid, ClientId}], "Session ~s dropped Qos2 message " lager:critical([{client, ClientId}], "Session ~s dropped Qos2 message "
"for too many awaiting_rel: ~p", [ClientId, Msg]), "for too many awaiting_rel: ~p", [ClientId, Msg]),
{reply, {error, dropped}, Session} {reply, {error, dropped}, Session}
end; end;
handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) -> handle_call({destroy, ClientId}, _From, Session = #session{client_id = ClientId}) ->
lager:warning("Session ~s destroyed", [ClientId]), lager:warning("Session ~s destroyed", [ClientId]),
{stop, {shutdown, destroy}, ok, Session}; {stop, {shutdown, destroy}, ok, Session};
@ -309,7 +309,7 @@ handle_call(Req, _From, State) ->
handle_cast({resume, ClientId, ClientPid}, Session) -> handle_cast({resume, ClientId, ClientPid}, Session) ->
#session{clientid = ClientId, #session{client_id = ClientId,
client_pid = OldClientPid, client_pid = OldClientPid,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
@ -349,7 +349,7 @@ handle_cast({resume, ClientId, ClientPid}, Session) ->
{noreply, dequeue(Session2), hibernate}; {noreply, dequeue(Session2), hibernate};
%% PUBRAC %% PUBRAC
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ack = Awaiting}) -> handle_cast({puback, MsgId}, Session = #session{client_id = ClientId, awaiting_ack = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, {_, TRef}} -> {ok, {_, TRef}} ->
cancel_timer(TRef), cancel_timer(TRef),
@ -361,7 +361,7 @@ handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, awaiting_ac
end; end;
%% PUBREC %% PUBREC
handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId, handle_cast({pubrec, MsgId}, Session = #session{client_id = ClientId,
awaiting_ack = AwaitingAck, awaiting_ack = AwaitingAck,
awaiting_comp = AwaitingComp, awaiting_comp = AwaitingComp,
await_rel_timeout = Timeout}) -> await_rel_timeout = Timeout}) ->
@ -377,7 +377,7 @@ handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId,
end; end;
%% PUBREL %% PUBREL
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, handle_cast({pubrel, MsgId}, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel}) -> awaiting_rel = AwaitingRel}) ->
case maps:find(MsgId, AwaitingRel) of case maps:find(MsgId, AwaitingRel) of
{ok, {Msg, TRef}} -> {ok, {Msg, TRef}} ->
@ -390,7 +390,7 @@ handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId,
end; end;
%% PUBCOMP %% PUBCOMP
handle_cast({pubcomp, MsgId}, Session = #session{clientid = ClientId, awaiting_comp = AwaitingComp}) -> handle_cast({pubcomp, MsgId}, Session = #session{client_id = ClientId, awaiting_comp = AwaitingComp}) ->
case maps:find(MsgId, AwaitingComp) of case maps:find(MsgId, AwaitingComp) of
{ok, TRef} -> {ok, TRef} ->
cancel_timer(TRef), cancel_timer(TRef),
@ -417,7 +417,7 @@ handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
{noreply, Session}; {noreply, Session};
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}}, handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
Session = #session{clientid = ClientId, message_queue = MsgQ}) Session = #session{client_id = ClientId, message_queue = MsgQ})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case check_inflight(Session) of case check_inflight(Session) of
@ -433,7 +433,7 @@ handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_pid = unde
%% just remove awaiting %% just remove awaiting
{noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}}; {noreply, Session#session{awaiting_ack = maps:remove(MsgId, AwaitingAck)}};
handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = ClientId, handle_info({timeout, awaiting_ack, MsgId}, Session = #session{client_id = ClientId,
inflight_queue = InflightQ, inflight_queue = InflightQ,
awaiting_ack = AwaitingAck}) -> awaiting_ack = AwaitingAck}) ->
case maps:find(MsgId, AwaitingAck) of case maps:find(MsgId, AwaitingAck) of
@ -451,7 +451,7 @@ handle_info({timeout, awaiting_ack, MsgId}, Session = #session{clientid = Client
{noreply, Session} {noreply, Session}
end; end;
handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = ClientId, handle_info({timeout, awaiting_rel, MsgId}, Session = #session{client_id = ClientId,
awaiting_rel = AwaitingRel}) -> awaiting_rel = AwaitingRel}) ->
case maps:find(MsgId, AwaitingRel) of case maps:find(MsgId, AwaitingRel) of
{ok, {Msg, _TRef}} -> {ok, {Msg, _TRef}} ->
@ -463,7 +463,7 @@ handle_info({timeout, awaiting_rel, MsgId}, Session = #session{clientid = Client
{noreply, Session} {noreply, Session}
end; end;
handle_info({timeout, awaiting_comp, MsgId}, Session = #session{clientid = ClientId, handle_info({timeout, awaiting_comp, MsgId}, Session = #session{client_id = ClientId,
awaiting_comp = Awaiting}) -> awaiting_comp = Awaiting}) ->
case maps:find(MsgId, Awaiting) of case maps:find(MsgId, Awaiting) of
{ok, _TRef} -> {ok, _TRef} ->
@ -481,25 +481,25 @@ handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
{stop, normal, Session}; {stop, normal, Session};
handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false, handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
clientid = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
expired_after = Expires}) -> expired_after = Expires}) ->
lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]), lager:info("Session ~s unlink with client ~p: reason=~p", [ClientId, ClientPid, Reason]),
TRef = timer(Expires, session_expired), TRef = timer(Expires, session_expired),
{noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate}; {noreply, Session#session{client_pid = undefined, expired_timer = TRef}, hibernate};
handle_info({'EXIT', Pid, _Reason}, Session = #session{clientid = ClientId, handle_info({'EXIT', Pid, _Reason}, Session = #session{client_id = ClientId,
client_pid = ClientPid}) -> client_pid = ClientPid}) ->
lager:error("Session ~s received unexpected EXIT:" lager:error("Session ~s received unexpected EXIT:"
" client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]), " client_pid=~p, exit_pid=~p", [ClientId, ClientPid, Pid]),
{noreply, Session}; {noreply, Session};
handle_info(session_expired, Session = #session{clientid = ClientId}) -> handle_info(session_expired, Session = #session{client_id = ClientId}) ->
lager:error("Session ~s expired, shutdown now!", [ClientId]), lager:error("Session ~s expired, shutdown now!", [ClientId]),
{stop, {shutdown, expired}, Session}; {stop, {shutdown, expired}, Session};
handle_info(Info, Session = #session{clientid = ClientId}) -> handle_info(Info, Session = #session{client_id = ClientId}) ->
lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]), lager:critical("Session ~s received unexpected info: ~p", [ClientId, Info]),
{noreply, Session}. {noreply, Session}.

View File

@ -34,6 +34,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_sm). -module(emqttd_sm).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -67,12 +68,11 @@
%% @doc Start a session manager %% @doc Start a session manager
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec start_link(Id, SessStatsFun) -> {ok, pid()} | ignore | {error, any()} when -spec start_link(Id, StatsFun) -> {ok, pid()} | ignore | {error, any()} when
Id :: pos_integer(), Id :: pos_integer(),
%ClientStatsFun :: fun(), StatsFun :: {fun(), fun()}.
SessStatsFun :: fun(). start_link(Id, StatsFun) ->
start_link(Id, SessStatsFun) -> gen_server:start_link(?MODULE, [Id, StatsFun], []).
gen_server:start_link(?MODULE, [Id, SessStatsFun], []).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Pool name. %% @doc Pool name.
@ -103,7 +103,7 @@ start_session(CleanSess, ClientId) ->
-spec lookup_session(binary()) -> pid() | undefined. -spec lookup_session(binary()) -> pid() | undefined.
lookup_session(ClientId) -> lookup_session(ClientId) ->
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _}] -> SessPid; [{_Clean, _, SessPid, _}] -> SessPid;
[] -> undefined [] -> undefined
end. end.
@ -129,7 +129,7 @@ init([Id, StatsFun]) ->
handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
Reply = Reply =
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, _MRef}] -> [{_Clean, _, SessPid, _MRef}] ->
emqttd_session:resume(SessPid, ClientId, ClientPid), emqttd_session:resume(SessPid, ClientId, ClientPid),
{ok, SessPid}; {ok, SessPid};
[] -> [] ->
@ -139,7 +139,7 @@ handle_call({start_session, {false, ClientId, ClientPid}}, _From, State) ->
handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] -> [{_Clean, _, SessPid, MRef}] ->
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
emqttd_session:destroy(SessPid, ClientId); emqttd_session:destroy(SessPid, ClientId);
[] -> [] ->
@ -149,7 +149,7 @@ handle_call({start_session, {true, ClientId, ClientPid}}, _From, State) ->
handle_call({destroy_session, ClientId}, _From, State) -> handle_call({destroy_session, ClientId}, _From, State) ->
case ets:lookup(?SESSION_TAB, ClientId) of case ets:lookup(?SESSION_TAB, ClientId) of
[{_, SessPid, MRef}] -> [{_Clean, _, SessPid, MRef}] ->
emqttd_session:destroy(SessPid, ClientId), emqttd_session:destroy(SessPid, ClientId),
erlang:demonitor(MRef, [flush]), erlang:demonitor(MRef, [flush]),
ets:delete(?SESSION_TAB, ClientId); ets:delete(?SESSION_TAB, ClientId);
@ -165,7 +165,7 @@ handle_cast(_Msg, State) ->
{noreply, State}. {noreply, State}.
handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) ->
ets:match_delete(?SESSION_TAB, {'_', DownPid, MRef}), ets:match_delete(?SESSION_TAB, {'_', '_', DownPid, MRef}),
{noreply, setstats(State)}; {noreply, setstats(State)};
handle_info(_Info, State) -> handle_info(_Info, State) ->
@ -184,13 +184,14 @@ code_change(_OldVsn, State, _Extra) ->
new_session(CleanSess, ClientId, ClientPid) -> new_session(CleanSess, ClientId, ClientPid) ->
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
ets:insert(?SESSION_TAB, {ClientId, SessPid, erlang:monitor(process, SessPid)}), MRef = erlang:monitor(process, SessPid),
ets:insert(?SESSION_TAB, {CleanSess, ClientId, SessPid, MRef}),
{ok, SessPid}; {ok, SessPid};
{error, Error} -> {error, Error} ->
{error, Error} {error, Error}
end. end.
setstats(State = #state{statsfun = StatsFun}) -> setstats(State = #state{statsfun = {CFun, SFun}}) ->
StatsFun(ets:info(?SESSION_TAB, size)), State. CFun(ets:info(?SESSION_TAB, size)),
SFun(ets:select_count(?SESSION_TAB, [{{true, '_', '_', '_'}, [], [true]}])),
State.

View File

@ -24,6 +24,7 @@
%%% %%%
%%% @end %%% @end
%%%----------------------------------------------------------------------------- %%%-----------------------------------------------------------------------------
-module(emqttd_sm_sup). -module(emqttd_sm_sup).
-author("Feng Lee <feng@emqtt.io>"). -author("Feng Lee <feng@emqtt.io>").
@ -42,19 +43,20 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
ets:new(emqttd_sm:table(), [set, named_table, public, ets:new(emqttd_sm:table(), [set, named_table, public, {keypos, 2},
{write_concurrency, true}]), {write_concurrency, true}]),
Schedulers = erlang:system_info(schedulers), Schedulers = erlang:system_info(schedulers),
gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]), gproc_pool:new(emqttd_sm:pool(), hash, [{size, Schedulers}]),
%%ClientStatsFun = emqttd_stats:statsfun('clients/count', 'clients/max'),
SessStatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
Children = lists:map( Children = lists:map(
fun(I) -> fun(I) ->
Name = {emqttd_sm, I}, Name = {emqttd_sm, I},
gproc_pool:add_worker(emqttd_sm:pool(), Name, I), gproc_pool:add_worker(emqttd_sm:pool(), Name, I),
{Name, {emqttd_sm, start_link, [I, SessStatsFun]}, {Name, {emqttd_sm, start_link, [I, statsfun()]},
permanent, 10000, worker, [emqttd_sm]} permanent, 10000, worker, [emqttd_sm]}
end, lists:seq(1, Schedulers)), end, lists:seq(1, Schedulers)),
{ok, {{one_for_all, 10, 100}, Children}}. {ok, {{one_for_all, 10, 100}, Children}}.
statsfun() ->
{emqttd_stats:statsfun('clients/count', 'clients/max'),
emqttd_stats:statsfun('sessions/count', 'sessions/max')}.

View File

@ -124,6 +124,7 @@ setstat(Stat, Val) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% @doc Set stats with max %% @doc Set stats with max
%% TODO: this is wrong...
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean(). -spec setstats(Stat :: atom(), MaxStat :: atom(), Val :: pos_integer()) -> boolean().