2.0 - improve design of session and hook

This commit is contained in:
Feng Lee 2016-08-16 14:29:39 +08:00
parent c72a07dbb8
commit d0be556f33
8 changed files with 108 additions and 103 deletions

View File

@ -491,13 +491,13 @@ print(Routes = [#mqtt_route{topic = Topic} | _]) ->
Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes], Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]); ?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) -> %% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
TopicTable = [io_lib:format("~s:~w", [Topic, Qos]) %% TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
|| #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions], %% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]); %% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
print(Topics = [#mqtt_topic{}|_]) -> %% print(Topics = [#mqtt_topic{}|_]) ->
foreach(fun print/1, Topics); %% foreach(fun print/1, Topics);
print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) -> print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n", ?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
@ -509,8 +509,8 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = User
[ClientId, CleanSess, Username, emqttd_net:format(Peername), [ClientId, CleanSess, Username, emqttd_net:format(Peername),
emqttd_time:now_to_secs(ConnectedAt)]); emqttd_time:now_to_secs(ConnectedAt)]);
print(#mqtt_topic{topic = Topic, flags = Flags}) -> %% print(#mqtt_topic{topic = Topic, flags = Flags}) ->
?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]); %% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
print(#mqtt_route{topic = Topic, node = Node}) -> print(#mqtt_route{topic = Topic, node = Node}) ->
?PRINT("~s -> ~s~n", [Topic, Node]); ?PRINT("~s -> ~s~n", [Topic, Node]);

View File

@ -81,8 +81,7 @@ from_packet(#mqtt_packet_connect{client_id = ClientId,
will_msg = Msg}) -> will_msg = Msg}) ->
#mqtt_message{msgid = msgid(Qos), #mqtt_message{msgid = msgid(Qos),
topic = Topic, topic = Topic,
from = ClientId, from = {ClientId, Username},
sender = Username,
retain = Retain, retain = Retain,
qos = Qos, qos = Qos,
dup = false, dup = false,
@ -95,7 +94,7 @@ from_packet(ClientId, Packet) ->
from_packet(Username, ClientId, Packet) -> from_packet(Username, ClientId, Packet) ->
Msg = from_packet(Packet), Msg = from_packet(Packet),
Msg#mqtt_message{from = ClientId, sender = Username}. Msg#mqtt_message{from = {ClientId, Username}}.
msgid(?QOS_0) -> msgid(?QOS_0) ->
undefined; undefined;
@ -150,10 +149,10 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message %% @doc Format MQTT Message
format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, sender = Sender, format(#mqtt_message{msgid = MsgId, pktid = PktId, from = {ClientId, Username},
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Sender=~s, Topic=~s)", io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Sender, Topic]). [i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]).
i(true) -> 1; i(true) -> 1;
i(false) -> 0; i(false) -> 0;

View File

@ -45,7 +45,7 @@ on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId,
emqttd:publish(emqttd_message:set_flag(sys, Msg)), emqttd:publish(emqttd_message:set_flag(sys, Msg)),
{ok, Client}. {ok, Client}.
on_client_disconnected(Reason, ClientId, Opts) -> on_client_disconnected(Reason, #mqtt_client{client_id = ClientId}, Opts) ->
Json = mochijson2:encode([{clientid, ClientId}, Json = mochijson2:encode([{clientid, ClientId},
{reason, reason(Reason)}, {reason, reason(Reason)},
{ts, emqttd_time:now_to_secs()}]), {ts, emqttd_time:now_to_secs()}]),

View File

@ -146,7 +146,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
State2 = maybe_set_clientid(State1), State2 = maybe_set_clientid(State1),
%% Start session %% Start session
case emqttd_sm:start_session(CleanSess, clientid(State2)) of case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of
{ok, Session, SP} -> {ok, Session, SP} ->
%% Register the client %% Register the client
emqttd_cm:reg(client(State2)), emqttd_cm:reg(client(State2)),
@ -280,10 +280,11 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) ->
%% emqttd_cm:unreg(ClientId); %% emqttd_cm:unreg(ClientId);
ignore; ignore;
shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) -> shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
?LOG(info, "Shutdown for ~p", [Error], State), ?LOG(info, "Shutdown for ~p", [Error], State),
send_willmsg(ClientId, WillMsg), Client = client(State),
emqttd:run_hooks('client.disconnected', [Error], ClientId), send_willmsg(Client, WillMsg),
emqttd:run_hooks('client.disconnected', [Error], Client),
%% let it down %% let it down
%% emqttd_cm:unreg(ClientId). %% emqttd_cm:unreg(ClientId).
ok. ok.
@ -301,10 +302,10 @@ maybe_set_clientid(State = #proto_state{client_id = NullId})
maybe_set_clientid(State) -> maybe_set_clientid(State) ->
State. State.
send_willmsg(_ClientId, undefined) -> send_willmsg(_Client, undefined) ->
ignore; ignore;
send_willmsg(ClientId, WillMsg) -> send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
emqttd:publish(WillMsg#mqtt_message{from = ClientId}). emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
start_keepalive(0) -> ignore; start_keepalive(0) -> ignore;

View File

@ -77,6 +77,9 @@
%% Old Client Pid that has been kickout %% Old Client Pid that has been kickout
old_client_pid :: pid(), old_client_pid :: pid(),
%% Username
username :: binary() | undefined,
%% Last packet id of the session %% Last packet id of the session
packet_id = 1, packet_id = 1,
@ -136,9 +139,9 @@
"Session(~s): " ++ Format, [State#session.client_id | Args])). "Session(~s): " ++ Format, [State#session.client_id | Args])).
%% @doc Start a session. %% @doc Start a session.
-spec(start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}). -spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
start_link(CleanSess, ClientId, ClientPid) -> start_link(CleanSess, {ClientId, Username}, ClientPid) ->
gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []). gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
%% @doc Resume a session. %% @doc Resume a session.
-spec(resume(pid(), mqtt_client_id(), pid()) -> ok). -spec(resume(pid(), mqtt_client_id(), pid()) -> ok).
@ -208,10 +211,10 @@ unsubscribe(SessPid, Topics) ->
gen_server2:cast(SessPid, {unsubscribe, Topics}). gen_server2:cast(SessPid, {unsubscribe, Topics}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server callbacks %% gen_server Callbacks
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([CleanSess, ClientId, ClientPid]) -> init([CleanSess, {ClientId, Username}, ClientPid]) ->
process_flag(trap_exit, true), process_flag(trap_exit, true),
true = link(ClientPid), true = link(ClientPid),
SessEnv = emqttd_conf:session(), SessEnv = emqttd_conf:session(),
@ -219,6 +222,7 @@ init([CleanSess, ClientId, ClientPid]) ->
clean_sess = CleanSess, clean_sess = CleanSess,
client_id = ClientId, client_id = ClientId,
client_pid = ClientPid, client_pid = ClientPid,
username = Username,
subscriptions = dict:new(), subscriptions = dict:new(),
inflight_queue = [], inflight_queue = [],
max_inflight = get_value(max_inflight, SessEnv, 0), max_inflight = get_value(max_inflight, SessEnv, 0),
@ -284,68 +288,67 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State). ?UNEXPECTED_REQ(Req, State).
handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id = ClientId, %%TODO: 2.0 FIX
subscriptions = Subscriptions}) ->
%% TODO: Ugly...
TopicTable0 = lists:map(fun({T, Q}) ->
{T1, Opts} = emqttd_topic:strip(T),
{T1, [{qos, Q} | Opts]}
end, RawTopicTable),
case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
{ok, TopicTable} ->
?LOG(info, "Subscribe ~p", [TopicTable], Session),
Subscriptions1 = lists:foldl(
fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) ->
case dict:find(Topic, SubDict) of
{ok, Qos} ->
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
SubDict;
{ok, OldQos} ->
emqttd:setqos(Topic, ClientId, Qos),
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
dict:store(Topic, Qos, SubDict);
error ->
emqttd:subscribe(Topic, ClientId, Opts),
%%TODO: the design is ugly...
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
emqttd_retainer:dispatch(Topic, self()),
dict:store(Topic, Qos, SubDict) handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId,
end username = Username,
end, Subscriptions, TopicTable), subscriptions = Subscriptions}) ->
AckFun([Qos || {_, Qos} <- RawTopicTable]), ?LOG(info, "Subscribe ~p", [TopicTable], Session),
emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable), {GrantedQos, Subscriptions1} =
hibernate(Session#session{subscriptions = Subscriptions1}); lists:foldl(fun({RawTopic, Qos}, {QosAcc, SubDict}) ->
{stop, TopicTable} -> {Topic, Opts} = emqttd_topic:strip(RawTopic),
?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session), case emqttd:run_hooks('client.subscribe', [{ClientId, Username}], {Topic, Opts}) of
hibernate(Session) {ok, {Topic1, Opts1}} ->
end; NewQos = proplists:get_value(qos, Opts1, Qos),
{[NewQos | QosAcc], case dict:find(Topic, SubDict) of
{ok, NewQos} ->
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session),
SubDict;
{ok, OldQos} ->
emqttd:setqos(Topic, ClientId, NewQos),
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], Session),
dict:store(Topic, NewQos, SubDict);
error ->
emqttd:subscribe(Topic1, ClientId, Opts1),
%%TODO: the design is ugly...
%% <MQTT V3.1.1>: 3.8.4
%% Where the Topic Filter is not identical to any existing Subscriptions filter,
%% a new Subscription is created and all matching retained messages are sent.
emqttd_retainer:dispatch(Topic1, self()),
dict:store(Topic1, NewQos, SubDict)
end};
{stop, _} ->
?LOG(error, "Cannot subscribe: ~p", [Topic], Session),
{[128 | QosAcc], SubDict}
end
end, {[], Subscriptions}, TopicTable),
AckFun(lists:reverse(GrantedQos)),
%%emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
hibernate(Session#session{subscriptions = Subscriptions1});
handle_cast({unsubscribe, RawTopics}, Session = #session{client_id = ClientId, %%TODO: 2.0 FIX
subscriptions = Subscriptions}) ->
Topics0 = lists:map(fun(Topic) -> handle_cast({unsubscribe, Topics}, Session = #session{client_id = ClientId,
{T, _Opts} = emqttd_topic:strip(Topic), T username = Username,
end, RawTopics), subscriptions = Subscriptions}) ->
case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of ?LOG(info, "unsubscribe ~p", [Topics], Session),
{ok, Topics} -> Subscriptions1 =
?LOG(info, "unsubscribe ~p", [Topics], Session), lists:foldl(fun(RawTopic, SubDict) ->
Subscriptions1 = lists:foldl( {Topic0, _Opts} = emqttd_topic:strip(RawTopic),
fun(Topic, SubDict) -> case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], Topic0) of
case dict:find(Topic, SubDict) of {ok, Topic1} ->
{ok, _Qos} -> case dict:find(Topic1, SubDict) of
emqttd:unsubscribe(Topic, ClientId), {ok, _Qos} ->
dict:erase(Topic, SubDict); emqttd:unsubscribe(Topic1, ClientId),
error -> dict:erase(Topic1, SubDict);
error ->
SubDict
end;
{stop, _} ->
SubDict SubDict
end end
end, Subscriptions, Topics), end, Subscriptions, Topics),
hibernate(Session#session{subscriptions = Subscriptions1}); hibernate(Session#session{subscriptions = Subscriptions1});
{stop, Topics} ->
?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session),
hibernate(Session)
end;
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) -> handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
?LOG(warning, "destroyed", [], Session), ?LOG(warning, "destroyed", [], Session),

View File

@ -29,9 +29,9 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc Start a session %% @doc Start a session
-spec(start_session(boolean(), binary(), pid()) -> {ok, pid()}). -spec(start_session(boolean(), {binary(), binary() | undefined} , pid()) -> {ok, pid()}).
start_session(CleanSess, ClientId, ClientPid) -> start_session(CleanSess, {ClientId, Username}, ClientPid) ->
supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]). supervisor:start_child(?MODULE, [CleanSess, {ClientId, Username}, ClientPid]).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Supervisor callbacks %% Supervisor callbacks

View File

@ -32,7 +32,7 @@
%% API Function Exports %% API Function Exports
-export([start_link/2]). -export([start_link/2]).
-export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]). -export([start_session/2, lookup_session/1, reg_session/3, unreg_session/1]).
-export([dispatch/3]). -export([dispatch/3]).
@ -77,10 +77,10 @@ start_link(Pool, Id) ->
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
%% @doc Start a session %% @doc Start a session
-spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}). -spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, any()}).
start_session(CleanSess, ClientId) -> start_session(CleanSess, {ClientId, Username}) ->
SM = gproc_pool:pick_worker(?POOL, ClientId), SM = gproc_pool:pick_worker(?POOL, ClientId),
call(SM, {start_session, {CleanSess, ClientId, self()}}). call(SM, {start_session, CleanSess, {ClientId, Username}, self()}).
%% @doc Lookup a Session %% @doc Lookup a Session
-spec(lookup_session(binary()) -> mqtt_session() | undefined). -spec(lookup_session(binary()) -> mqtt_session() | undefined).
@ -91,18 +91,18 @@ lookup_session(ClientId) ->
end. end.
%% @doc Register a session with info. %% @doc Register a session with info.
-spec(register_session(binary(), boolean(), [tuple()]) -> true). -spec(reg_session(binary(), boolean(), [tuple()]) -> true).
register_session(ClientId, CleanSess, Properties) -> reg_session(ClientId, CleanSess, Properties) ->
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}). ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
%% @doc Unregister a session. %% @doc Unregister a session.
-spec(unregister_session(binary()) -> true). -spec(unreg_session(binary()) -> true).
unregister_session(ClientId) -> unreg_session(ClientId) ->
ets:delete(mqtt_local_session, ClientId). ets:delete(mqtt_local_session, ClientId).
dispatch(ClientId, Topic, Msg) -> dispatch(ClientId, Topic, Msg) ->
try ets:lookup_element(mqtt_local_session, ClientId, 2) of try ets:lookup_element(mqtt_local_session, ClientId, 2) of
Pid -> Pid ! {dispatch, Topic, Msg} Pid -> Pid ! {deliver, Topic, Msg}
catch catch
error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How?? error:badarg -> io:format("Session Not Found: ~p~n", [ClientId]), ok %%TODO: How??
end. end.
@ -128,11 +128,11 @@ prioritise_info(_Msg, _Len, _State) ->
2. 2.
%% Persistent Session %% Persistent Session
handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) -> handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) ->
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
%% Create session locally %% Create session locally
create_session(Client, State); create_session({false, {ClientId, Username}, ClientPid}, State);
Session -> Session ->
case resume_session(Session, ClientPid) of case resume_session(Session, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
@ -143,7 +143,8 @@ handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State
end; end;
%% Transient Session %% Transient Session
handle_call({start_session, Client = {true, ClientId, _ClientPid}}, _From, State) -> handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State) ->
Client = {true, {ClientId, Username}, ClientPid},
case lookup_session(ClientId) of case lookup_session(ClientId) of
undefined -> undefined ->
create_session(Client, State); create_session(Client, State);
@ -195,8 +196,8 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Create Session Locally %% Create Session Locally
create_session({CleanSess, ClientId, ClientPid}, State) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
case create_session(CleanSess, ClientId, ClientPid) of case create_session(CleanSess, {ClientId, Username}, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
{reply, {ok, SessPid, false}, {reply, {ok, SessPid, false},
monitor_session(ClientId, SessPid, State)}; monitor_session(ClientId, SessPid, State)};
@ -204,8 +205,8 @@ create_session({CleanSess, ClientId, ClientPid}, State) ->
{reply, {error, Error}, State} {reply, {error, Error}, State}
end. end.
create_session(CleanSess, ClientId, ClientPid) -> create_session(CleanSess, {ClientId, Username}, ClientPid) ->
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of
{ok, SessPid} -> {ok, SessPid} ->
Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess}, Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess},
case insert_session(Session) of case insert_session(Session) of

View File

@ -17,6 +17,7 @@
-module(emqttd_topic). -module(emqttd_topic).
-import(lists, [reverse/1]). -import(lists, [reverse/1]).
-export([match/2, validate/1, triples/1, words/1, wildcard/1]). -export([match/2, validate/1, triples/1, words/1, wildcard/1]).
-export([join/1, feed_var/3, systop/1]). -export([join/1, feed_var/3, systop/1]).