Merge branch 'emq20' of github.com:emqtt/emqttd into emq20
This commit is contained in:
commit
36a33feda8
|
@ -1,3 +1,8 @@
|
|||
==============
|
||||
|
||||
.. _coap:
|
||||
|
||||
=============
|
||||
CoAP Protocol
|
||||
==============
|
||||
=============
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
|
||||
.. _mqtt_sn:
|
||||
|
||||
TODO:...
|
||||
|
||||
================
|
||||
MQTT-SN Protocol
|
||||
================
|
||||
|
||||
|
|
@ -44,41 +44,13 @@
|
|||
|
||||
-define(PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Topic
|
||||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_topic, {
|
||||
topic :: binary(),
|
||||
flags :: [retained | static]
|
||||
}).
|
||||
|
||||
-type(mqtt_topic() :: #mqtt_topic{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Route
|
||||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_route, {
|
||||
topic :: binary(),
|
||||
node :: node()
|
||||
}).
|
||||
|
||||
-type(mqtt_route() :: #mqtt_route{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Subscription
|
||||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_subscription, {
|
||||
subid :: binary() | atom() | pid(),
|
||||
topic :: binary(),
|
||||
qos = 0 :: 0 | 1 | 2
|
||||
}).
|
||||
|
||||
-type(mqtt_subscription() :: #mqtt_subscription{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Client
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-type(ws_header_key() :: atom() | binary() | string()).
|
||||
-type(ws_header_val() :: atom() | binary() | string() | integer()).
|
||||
|
||||
-record(mqtt_client, {
|
||||
client_id :: binary() | undefined,
|
||||
client_pid :: pid(),
|
||||
|
@ -88,9 +60,7 @@
|
|||
proto_ver :: 3 | 4,
|
||||
keepalive = 0,
|
||||
will_topic :: undefined | binary(),
|
||||
token :: binary() | undefined, %% auth token
|
||||
cookie :: binary() | undefined, %% auth cookie
|
||||
%%ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
|
||||
ws_initial_headers :: list({ws_header_key(), ws_header_val()}),
|
||||
connected_at :: erlang:timestamp()
|
||||
}).
|
||||
|
||||
|
@ -117,6 +87,7 @@
|
|||
-record(mqtt_message, {
|
||||
msgid :: mqtt_msgid(), %% Global unique message ID
|
||||
pktid :: mqtt_pktid(), %% PacketId
|
||||
from :: {binary(), undefined | binary()}, %% ClientId and Username
|
||||
topic :: binary(), %% Topic that the message is published to
|
||||
qos = 0 :: 0 | 1 | 2, %% Message QoS
|
||||
flags = [] :: [retain | dup | sys], %% Message Flags
|
||||
|
@ -135,13 +106,22 @@
|
|||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_delivery, {
|
||||
sender :: pid(), %% Pid of the sender/publisher
|
||||
from :: binary(),
|
||||
message :: mqtt_message(), %% Message
|
||||
message :: mqtt_message(), %% Message
|
||||
flows :: list()
|
||||
}).
|
||||
|
||||
-type(mqtt_delivery() :: #mqtt_delivery{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Route
|
||||
%%--------------------------------------------------------------------
|
||||
-record(mqtt_route, {
|
||||
topic :: binary(),
|
||||
node :: node()
|
||||
}).
|
||||
|
||||
-type(mqtt_route() :: #mqtt_route{}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Alarm
|
||||
%%--------------------------------------------------------------------
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
{?MQTT_PROTO_V31, <<"MQIsdp">>},
|
||||
{?MQTT_PROTO_V311, <<"MQTT">>}]).
|
||||
|
||||
-type mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311.
|
||||
-type(mqtt_vsn() :: ?MQTT_PROTO_V31 | ?MQTT_PROTO_V311).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT QoS
|
||||
|
@ -41,11 +41,11 @@
|
|||
|
||||
-define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)).
|
||||
|
||||
-type mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2.
|
||||
-type(mqtt_qos() :: ?QOS0 | ?QOS1 | ?QOS2).
|
||||
|
||||
-type mqtt_qos_name() :: qos0 | at_most_once |
|
||||
-type(mqtt_qos_name() :: qos0 | at_most_once |
|
||||
qos1 | at_least_once |
|
||||
qos2 | exactly_once.
|
||||
qos2 | exactly_once).
|
||||
|
||||
-define(QOS_I(Name),
|
||||
begin
|
||||
|
@ -102,7 +102,7 @@
|
|||
'PINGRESP',
|
||||
'DISCONNECT']).
|
||||
|
||||
-type mqtt_packet_type() :: ?RESERVED..?DISCONNECT.
|
||||
-type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Connect Return Codes
|
||||
|
@ -114,7 +114,7 @@
|
|||
-define(CONNACK_CREDENTIALS, 4). %% Username or password is malformed
|
||||
-define(CONNACK_AUTH, 5). %% Client is not authorized to connect
|
||||
|
||||
-type mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH.
|
||||
-type(mqtt_connack() :: ?CONNACK_ACCEPT..?CONNACK_AUTH).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% MQTT Parser and Serializer
|
||||
|
@ -135,8 +135,9 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% MQTT Packets
|
||||
%%--------------------------------------------------------------------
|
||||
-type mqtt_client_id() :: binary().
|
||||
-type mqtt_packet_id() :: 1..16#ffff | undefined.
|
||||
-type(mqtt_client_id() :: binary()).
|
||||
-type(mqtt_username() :: binary() | undefined).
|
||||
-type(mqtt_packet_id() :: 1..16#ffff | undefined).
|
||||
|
||||
-record(mqtt_packet_connect, {
|
||||
client_id = <<>> :: mqtt_client_id(),
|
||||
|
|
|
@ -491,13 +491,13 @@ print(Routes = [#mqtt_route{topic = Topic} | _]) ->
|
|||
Nodes = [atom_to_list(Node) || #mqtt_route{node = Node} <- Routes],
|
||||
?PRINT("~s -> ~s~n", [Topic, string:join(Nodes, ",")]);
|
||||
|
||||
print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
|
||||
TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
|
||||
|| #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
|
||||
?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
|
||||
%% print(Subscriptions = [#mqtt_subscription{subid = ClientId} | _]) ->
|
||||
%% TopicTable = [io_lib:format("~s:~w", [Topic, Qos])
|
||||
%% || #mqtt_subscription{topic = Topic, qos = Qos} <- Subscriptions],
|
||||
%% ?PRINT("~s -> ~s~n", [ClientId, string:join(TopicTable, ",")]);
|
||||
|
||||
print(Topics = [#mqtt_topic{}|_]) ->
|
||||
foreach(fun print/1, Topics);
|
||||
%% print(Topics = [#mqtt_topic{}|_]) ->
|
||||
%% foreach(fun print/1, Topics);
|
||||
|
||||
print(#mqtt_plugin{name = Name, version = Ver, descr = Descr, active = Active}) ->
|
||||
?PRINT("Plugin(~s, version=~s, description=~s, active=~s)~n",
|
||||
|
@ -509,8 +509,8 @@ print(#mqtt_client{client_id = ClientId, clean_sess = CleanSess, username = User
|
|||
[ClientId, CleanSess, Username, emqttd_net:format(Peername),
|
||||
emqttd_time:now_to_secs(ConnectedAt)]);
|
||||
|
||||
print(#mqtt_topic{topic = Topic, flags = Flags}) ->
|
||||
?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
|
||||
%% print(#mqtt_topic{topic = Topic, flags = Flags}) ->
|
||||
%% ?PRINT("~s: ~s~n", [Topic, string:join([atom_to_list(F) || F <- Flags], ",")]);
|
||||
|
||||
print(#mqtt_route{topic = Topic, node = Node}) ->
|
||||
?PRINT("~s -> ~s~n", [Topic, Node]);
|
||||
|
|
|
@ -81,8 +81,7 @@ from_packet(#mqtt_packet_connect{client_id = ClientId,
|
|||
will_msg = Msg}) ->
|
||||
#mqtt_message{msgid = msgid(Qos),
|
||||
topic = Topic,
|
||||
from = ClientId,
|
||||
sender = Username,
|
||||
from = {ClientId, Username},
|
||||
retain = Retain,
|
||||
qos = Qos,
|
||||
dup = false,
|
||||
|
@ -95,7 +94,7 @@ from_packet(ClientId, Packet) ->
|
|||
|
||||
from_packet(Username, ClientId, Packet) ->
|
||||
Msg = from_packet(Packet),
|
||||
Msg#mqtt_message{from = ClientId, sender = Username}.
|
||||
Msg#mqtt_message{from = {ClientId, Username}}.
|
||||
|
||||
msgid(?QOS_0) ->
|
||||
undefined;
|
||||
|
@ -150,10 +149,10 @@ unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
|
|||
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
|
||||
|
||||
%% @doc Format MQTT Message
|
||||
format(#mqtt_message{msgid = MsgId, pktid = PktId, from = From, sender = Sender,
|
||||
format(#mqtt_message{msgid = MsgId, pktid = PktId, from = {ClientId, Username},
|
||||
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
|
||||
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Sender=~s, Topic=~s)",
|
||||
[i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Sender, Topic]).
|
||||
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)",
|
||||
[i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]).
|
||||
|
||||
i(true) -> 1;
|
||||
i(false) -> 0;
|
||||
|
|
|
@ -45,7 +45,7 @@ on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId,
|
|||
emqttd:publish(emqttd_message:set_flag(sys, Msg)),
|
||||
{ok, Client}.
|
||||
|
||||
on_client_disconnected(Reason, ClientId, Opts) ->
|
||||
on_client_disconnected(Reason, #mqtt_client{client_id = ClientId}, Opts) ->
|
||||
Json = mochijson2:encode([{clientid, ClientId},
|
||||
{reason, reason(Reason)},
|
||||
{ts, emqttd_time:now_to_secs()}]),
|
||||
|
|
|
@ -40,13 +40,13 @@ load(Opts) ->
|
|||
emqttd:hook('message.publish', fun ?MODULE:rewrite_publish/2, [Sections])
|
||||
end.
|
||||
|
||||
rewrite_subscribe(_ClientId, TopicTable, Sections) ->
|
||||
lager:info("Rewrite subscribe: ~p", [TopicTable]),
|
||||
{ok, [{match_topic(Topic, Sections), Qos} || {Topic, Qos} <- TopicTable]}.
|
||||
rewrite_subscribe({_ClientId, _Username}, {Topic, Opts}, Sections) ->
|
||||
lager:info("Rewrite subscribe: ~p", [{Topic, Opts}]),
|
||||
{ok, {match_topic(Topic, Sections), Opts}}.
|
||||
|
||||
rewrite_unsubscribe(_ClientId, Topics, Sections) ->
|
||||
lager:info("Rewrite unsubscribe: ~p", [Topics]),
|
||||
{ok, [match_topic(Topic, Sections) || Topic <- Topics]}.
|
||||
rewrite_unsubscribe({_ClientId, _Username}, {Topic, Opts}, Sections) ->
|
||||
lager:info("Rewrite unsubscribe: ~p", [{Topic, Opts}]),
|
||||
{ok, {match_topic(Topic, Sections), Opts}}.
|
||||
|
||||
rewrite_publish(Message=#mqtt_message{topic = Topic}, Sections) ->
|
||||
%%TODO: this will not work if the client is always online.
|
||||
|
|
|
@ -146,7 +146,7 @@ process(Packet = ?CONNECT_PACKET(Var), State0) ->
|
|||
State2 = maybe_set_clientid(State1),
|
||||
|
||||
%% Start session
|
||||
case emqttd_sm:start_session(CleanSess, clientid(State2)) of
|
||||
case emqttd_sm:start_session(CleanSess, {clientid(State2), Username}) of
|
||||
{ok, Session, SP} ->
|
||||
%% Register the client
|
||||
emqttd_cm:reg(client(State2)),
|
||||
|
@ -247,9 +247,9 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
|
|||
end.
|
||||
|
||||
-spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
|
||||
send(Msg, State = #proto_state{client_id = ClientId})
|
||||
send(Msg, State = #proto_state{client_id = ClientId, username = Username})
|
||||
when is_record(Msg, mqtt_message) ->
|
||||
emqttd:run_hooks('message.delivered', [ClientId], Msg),
|
||||
emqttd:run_hooks('message.delivered', [{ClientId, Username}], Msg),
|
||||
send(emqttd_message:to_packet(Msg), State);
|
||||
|
||||
send(Packet, State = #proto_state{sendfun = SendFun})
|
||||
|
@ -280,10 +280,11 @@ shutdown(conflict, #proto_state{client_id = _ClientId}) ->
|
|||
%% emqttd_cm:unreg(ClientId);
|
||||
ignore;
|
||||
|
||||
shutdown(Error, State = #proto_state{client_id = ClientId, will_msg = WillMsg}) ->
|
||||
shutdown(Error, State = #proto_state{will_msg = WillMsg}) ->
|
||||
?LOG(info, "Shutdown for ~p", [Error], State),
|
||||
send_willmsg(ClientId, WillMsg),
|
||||
emqttd:run_hooks('client.disconnected', [Error], ClientId),
|
||||
Client = client(State),
|
||||
send_willmsg(Client, WillMsg),
|
||||
emqttd:run_hooks('client.disconnected', [Error], Client),
|
||||
%% let it down
|
||||
%% emqttd_cm:unreg(ClientId).
|
||||
ok.
|
||||
|
@ -301,10 +302,10 @@ maybe_set_clientid(State = #proto_state{client_id = NullId})
|
|||
maybe_set_clientid(State) ->
|
||||
State.
|
||||
|
||||
send_willmsg(_ClientId, undefined) ->
|
||||
send_willmsg(_Client, undefined) ->
|
||||
ignore;
|
||||
send_willmsg(ClientId, WillMsg) ->
|
||||
emqttd:publish(WillMsg#mqtt_message{from = ClientId}).
|
||||
send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
|
||||
emqttd:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
|
||||
|
||||
start_keepalive(0) -> ignore;
|
||||
|
||||
|
|
|
@ -77,6 +77,9 @@
|
|||
%% Old Client Pid that has been kickout
|
||||
old_client_pid :: pid(),
|
||||
|
||||
%% Username
|
||||
username :: binary() | undefined,
|
||||
|
||||
%% Last packet id of the session
|
||||
packet_id = 1,
|
||||
|
||||
|
@ -136,9 +139,9 @@
|
|||
"Session(~s): " ++ Format, [State#session.client_id | Args])).
|
||||
|
||||
%% @doc Start a session.
|
||||
-spec(start_link(boolean(), mqtt_client_id(), pid()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(CleanSess, ClientId, ClientPid) ->
|
||||
gen_server2:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
||||
-spec(start_link(boolean(), {mqtt_client_id(), mqtt_username()}, pid()) -> {ok, pid()} | {error, any()}).
|
||||
start_link(CleanSess, {ClientId, Username}, ClientPid) ->
|
||||
gen_server2:start_link(?MODULE, [CleanSess, {ClientId, Username}, ClientPid], []).
|
||||
|
||||
%% @doc Resume a session.
|
||||
-spec(resume(pid(), mqtt_client_id(), pid()) -> ok).
|
||||
|
@ -208,10 +211,10 @@ unsubscribe(SessPid, Topics) ->
|
|||
gen_server2:cast(SessPid, {unsubscribe, Topics}).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%% gen_server Callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([CleanSess, ClientId, ClientPid]) ->
|
||||
init([CleanSess, {ClientId, Username}, ClientPid]) ->
|
||||
process_flag(trap_exit, true),
|
||||
true = link(ClientPid),
|
||||
SessEnv = emqttd_conf:session(),
|
||||
|
@ -219,6 +222,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
|||
clean_sess = CleanSess,
|
||||
client_id = ClientId,
|
||||
client_pid = ClientPid,
|
||||
username = Username,
|
||||
subscriptions = dict:new(),
|
||||
inflight_queue = [],
|
||||
max_inflight = get_value(max_inflight, SessEnv, 0),
|
||||
|
@ -232,7 +236,7 @@ init([CleanSess, ClientId, ClientPid]) ->
|
|||
expired_after = get_value(expired_after, SessEnv) * 60,
|
||||
collect_interval = get_value(collect_interval, SessEnv, 0),
|
||||
timestamp = os:timestamp()},
|
||||
emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)),
|
||||
emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),
|
||||
%% Start statistics
|
||||
{ok, start_collector(Session), hibernate}.
|
||||
|
||||
|
@ -284,68 +288,67 @@ handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
|
|||
handle_call(Req, _From, State) ->
|
||||
?UNEXPECTED_REQ(Req, State).
|
||||
|
||||
handle_cast({subscribe, RawTopicTable, AckFun}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
%% TODO: Ugly...
|
||||
TopicTable0 = lists:map(fun({T, Q}) ->
|
||||
{T1, Opts} = emqttd_topic:strip(T),
|
||||
{T1, [{qos, Q} | Opts]}
|
||||
end, RawTopicTable),
|
||||
case emqttd:run_hooks('client.subscribe', [ClientId], TopicTable0) of
|
||||
{ok, TopicTable} ->
|
||||
?LOG(info, "Subscribe ~p", [TopicTable], Session),
|
||||
Subscriptions1 = lists:foldl(
|
||||
fun({Topic, Opts = [{qos, Qos}|_]}, SubDict) ->
|
||||
case dict:find(Topic, SubDict) of
|
||||
{ok, Qos} ->
|
||||
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, Qos], Session),
|
||||
SubDict;
|
||||
{ok, OldQos} ->
|
||||
emqttd:setqos(Topic, ClientId, Qos),
|
||||
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, Qos], Session),
|
||||
dict:store(Topic, Qos, SubDict);
|
||||
error ->
|
||||
emqttd:subscribe(Topic, ClientId, Opts),
|
||||
%%TODO: the design is ugly...
|
||||
%% <MQTT V3.1.1>: 3.8.4
|
||||
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||
%% a new Subscription is created and all matching retained messages are sent.
|
||||
emqttd_retainer:dispatch(Topic, self()),
|
||||
%%TODO: 2.0 FIX
|
||||
|
||||
dict:store(Topic, Qos, SubDict)
|
||||
end
|
||||
end, Subscriptions, TopicTable),
|
||||
AckFun([Qos || {_, Qos} <- RawTopicTable]),
|
||||
emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
|
||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||
{stop, TopicTable} ->
|
||||
?LOG(error, "Cannot subscribe: ~p", [TopicTable], Session),
|
||||
hibernate(Session)
|
||||
end;
|
||||
handle_cast({subscribe, TopicTable, AckFun}, Session = #session{client_id = ClientId,
|
||||
username = Username,
|
||||
subscriptions = Subscriptions}) ->
|
||||
?LOG(info, "Subscribe ~p", [TopicTable], Session),
|
||||
{GrantedQos, Subscriptions1} =
|
||||
lists:foldl(fun({RawTopic, Qos}, {QosAcc, SubDict}) ->
|
||||
{Topic, Opts} = emqttd_topic:strip(RawTopic),
|
||||
case emqttd:run_hooks('client.subscribe', [{ClientId, Username}], {Topic, Opts}) of
|
||||
{ok, {Topic1, Opts1}} ->
|
||||
NewQos = proplists:get_value(qos, Opts1, Qos),
|
||||
{[NewQos | QosAcc], case dict:find(Topic, SubDict) of
|
||||
{ok, NewQos} ->
|
||||
?LOG(warning, "duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], Session),
|
||||
SubDict;
|
||||
{ok, OldQos} ->
|
||||
emqttd:setqos(Topic, ClientId, NewQos),
|
||||
?LOG(warning, "duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], Session),
|
||||
dict:store(Topic, NewQos, SubDict);
|
||||
error ->
|
||||
emqttd:subscribe(Topic1, ClientId, Opts1),
|
||||
%%TODO: the design is ugly...
|
||||
%% <MQTT V3.1.1>: 3.8.4
|
||||
%% Where the Topic Filter is not identical to any existing Subscription’s filter,
|
||||
%% a new Subscription is created and all matching retained messages are sent.
|
||||
emqttd_retainer:dispatch(Topic1, self()),
|
||||
dict:store(Topic1, NewQos, SubDict)
|
||||
end};
|
||||
{stop, _} ->
|
||||
?LOG(error, "Cannot subscribe: ~p", [Topic], Session),
|
||||
{[128 | QosAcc], SubDict}
|
||||
end
|
||||
end, {[], Subscriptions}, TopicTable),
|
||||
AckFun(lists:reverse(GrantedQos)),
|
||||
%%emqttd:run_hooks('client.subscribe.after', [ClientId], TopicTable),
|
||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||
|
||||
handle_cast({unsubscribe, RawTopics}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
Topics0 = lists:map(fun(Topic) ->
|
||||
{T, _Opts} = emqttd_topic:strip(Topic), T
|
||||
end, RawTopics),
|
||||
case emqttd:run_hooks('client.unsubscribe', [ClientId], Topics0) of
|
||||
{ok, Topics} ->
|
||||
?LOG(info, "unsubscribe ~p", [Topics], Session),
|
||||
Subscriptions1 = lists:foldl(
|
||||
fun(Topic, SubDict) ->
|
||||
case dict:find(Topic, SubDict) of
|
||||
{ok, _Qos} ->
|
||||
emqttd:unsubscribe(Topic, ClientId),
|
||||
dict:erase(Topic, SubDict);
|
||||
error ->
|
||||
%%TODO: 2.0 FIX
|
||||
|
||||
handle_cast({unsubscribe, Topics}, Session = #session{client_id = ClientId,
|
||||
username = Username,
|
||||
subscriptions = Subscriptions}) ->
|
||||
?LOG(info, "unsubscribe ~p", [Topics], Session),
|
||||
Subscriptions1 =
|
||||
lists:foldl(fun(RawTopic, SubDict) ->
|
||||
{Topic0, _Opts} = emqttd_topic:strip(RawTopic),
|
||||
case emqttd:run_hooks('client.unsubscribe', [ClientId, Username], Topic0) of
|
||||
{ok, Topic1} ->
|
||||
case dict:find(Topic1, SubDict) of
|
||||
{ok, _Qos} ->
|
||||
emqttd:unsubscribe(Topic1, ClientId),
|
||||
dict:erase(Topic1, SubDict);
|
||||
error ->
|
||||
SubDict
|
||||
end;
|
||||
{stop, _} ->
|
||||
SubDict
|
||||
end
|
||||
end, Subscriptions, Topics),
|
||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||
{stop, Topics} ->
|
||||
?LOG(info, "Cannot unsubscribe: ~p", [Topics], Session),
|
||||
hibernate(Session)
|
||||
end;
|
||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||
|
||||
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
|
||||
?LOG(warning, "destroyed", [], Session),
|
||||
|
@ -391,8 +394,7 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C
|
|||
if
|
||||
CleanSess =:= true ->
|
||||
?LOG(warning, "CleanSess changed to false.", [], Session),
|
||||
%% emqttd_sm:unregister_session(CleanSess, ClientId),
|
||||
emqttd_sm:register_session(ClientId, false, sess_info(Session1));
|
||||
emqttd_sm:reg_session(ClientId, false, sess_info(Session1));
|
||||
CleanSess =:= false ->
|
||||
ok
|
||||
end,
|
||||
|
@ -506,7 +508,7 @@ handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp =
|
|||
end;
|
||||
|
||||
handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
||||
emqttd_sm:register_session(ClientId, CleanSess, sess_info(Session)),
|
||||
emqttd_sm:reg_session(ClientId, CleanSess, sess_info(Session)),
|
||||
hibernate(start_collector(Session));
|
||||
|
||||
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
||||
|
@ -539,7 +541,7 @@ handle_info(Info, Session) ->
|
|||
|
||||
terminate(_Reason, #session{client_id = ClientId}) ->
|
||||
emqttd:subscriber_down(ClientId),
|
||||
emqttd_sm:unregister_session(ClientId).
|
||||
emqttd_sm:unreg_session(ClientId).
|
||||
|
||||
code_change(_OldVsn, Session, _Extra) ->
|
||||
{ok, Session}.
|
||||
|
@ -656,11 +658,12 @@ await(#mqtt_message{pktid = PktId}, Session = #session{awaiting_ack = Awaiting
|
|||
Session#session{awaiting_ack = Awaiting1}.
|
||||
|
||||
acked(PktId, Session = #session{client_id = ClientId,
|
||||
username = Username,
|
||||
inflight_queue = InflightQ,
|
||||
awaiting_ack = Awaiting}) ->
|
||||
case lists:keyfind(PktId, 1, InflightQ) of
|
||||
{_, Msg} ->
|
||||
emqttd:run_hooks('message.acked', [ClientId], Msg);
|
||||
emqttd:run_hooks('message.acked', [{ClientId, Username}], Msg);
|
||||
false ->
|
||||
?LOG(error, "Cannot find acked pktid: ~p", [PktId], Session)
|
||||
end,
|
||||
|
|
|
@ -29,9 +29,9 @@ start_link() ->
|
|||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
%% @doc Start a session
|
||||
-spec(start_session(boolean(), binary(), pid()) -> {ok, pid()}).
|
||||
start_session(CleanSess, ClientId, ClientPid) ->
|
||||
supervisor:start_child(?MODULE, [CleanSess, ClientId, ClientPid]).
|
||||
-spec(start_session(boolean(), {binary(), binary() | undefined} , pid()) -> {ok, pid()}).
|
||||
start_session(CleanSess, {ClientId, Username}, ClientPid) ->
|
||||
supervisor:start_child(?MODULE, [CleanSess, {ClientId, Username}, ClientPid]).
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Supervisor callbacks
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
%% API Function Exports
|
||||
-export([start_link/2]).
|
||||
|
||||
-export([start_session/2, lookup_session/1, register_session/3, unregister_session/1]).
|
||||
-export([start_session/2, lookup_session/1, reg_session/3, unreg_session/1]).
|
||||
|
||||
-export([dispatch/3]).
|
||||
|
||||
|
@ -77,10 +77,10 @@ start_link(Pool, Id) ->
|
|||
gen_server2:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
|
||||
|
||||
%% @doc Start a session
|
||||
-spec(start_session(boolean(), binary()) -> {ok, pid(), boolean()} | {error, any()}).
|
||||
start_session(CleanSess, ClientId) ->
|
||||
-spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, any()}).
|
||||
start_session(CleanSess, {ClientId, Username}) ->
|
||||
SM = gproc_pool:pick_worker(?POOL, ClientId),
|
||||
call(SM, {start_session, {CleanSess, ClientId, self()}}).
|
||||
call(SM, {start_session, CleanSess, {ClientId, Username}, self()}).
|
||||
|
||||
%% @doc Lookup a Session
|
||||
-spec(lookup_session(binary()) -> mqtt_session() | undefined).
|
||||
|
@ -91,13 +91,13 @@ lookup_session(ClientId) ->
|
|||
end.
|
||||
|
||||
%% @doc Register a session with info.
|
||||
-spec(register_session(binary(), boolean(), [tuple()]) -> true).
|
||||
register_session(ClientId, CleanSess, Properties) ->
|
||||
-spec(reg_session(binary(), boolean(), [tuple()]) -> true).
|
||||
reg_session(ClientId, CleanSess, Properties) ->
|
||||
ets:insert(mqtt_local_session, {ClientId, self(), CleanSess, Properties}).
|
||||
|
||||
%% @doc Unregister a session.
|
||||
-spec(unregister_session(binary()) -> true).
|
||||
unregister_session(ClientId) ->
|
||||
-spec(unreg_session(binary()) -> true).
|
||||
unreg_session(ClientId) ->
|
||||
ets:delete(mqtt_local_session, ClientId).
|
||||
|
||||
dispatch(ClientId, Topic, Msg) ->
|
||||
|
@ -128,11 +128,11 @@ prioritise_info(_Msg, _Len, _State) ->
|
|||
2.
|
||||
|
||||
%% Persistent Session
|
||||
handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State) ->
|
||||
handle_call({start_session, false, {ClientId, Username}, ClientPid}, _From, State) ->
|
||||
case lookup_session(ClientId) of
|
||||
undefined ->
|
||||
%% Create session locally
|
||||
create_session(Client, State);
|
||||
create_session({false, {ClientId, Username}, ClientPid}, State);
|
||||
Session ->
|
||||
case resume_session(Session, ClientPid) of
|
||||
{ok, SessPid} ->
|
||||
|
@ -143,7 +143,8 @@ handle_call({start_session, Client = {false, ClientId, ClientPid}}, _From, State
|
|||
end;
|
||||
|
||||
%% Transient Session
|
||||
handle_call({start_session, Client = {true, ClientId, _ClientPid}}, _From, State) ->
|
||||
handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State) ->
|
||||
Client = {true, {ClientId, Username}, ClientPid},
|
||||
case lookup_session(ClientId) of
|
||||
undefined ->
|
||||
create_session(Client, State);
|
||||
|
@ -195,8 +196,8 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Create Session Locally
|
||||
create_session({CleanSess, ClientId, ClientPid}, State) ->
|
||||
case create_session(CleanSess, ClientId, ClientPid) of
|
||||
create_session({CleanSess, {ClientId, Username}, ClientPid}, State) ->
|
||||
case create_session(CleanSess, {ClientId, Username}, ClientPid) of
|
||||
{ok, SessPid} ->
|
||||
{reply, {ok, SessPid, false},
|
||||
monitor_session(ClientId, SessPid, State)};
|
||||
|
@ -204,8 +205,8 @@ create_session({CleanSess, ClientId, ClientPid}, State) ->
|
|||
{reply, {error, Error}, State}
|
||||
end.
|
||||
|
||||
create_session(CleanSess, ClientId, ClientPid) ->
|
||||
case emqttd_session_sup:start_session(CleanSess, ClientId, ClientPid) of
|
||||
create_session(CleanSess, {ClientId, Username}, ClientPid) ->
|
||||
case emqttd_session_sup:start_session(CleanSess, {ClientId, Username}, ClientPid) of
|
||||
{ok, SessPid} ->
|
||||
Session = #mqtt_session{client_id = ClientId, sess_pid = SessPid, persistent = not CleanSess},
|
||||
case insert_session(Session) of
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
-module(emqttd_topic).
|
||||
|
||||
-import(lists, [reverse/1]).
|
||||
|
||||
-export([match/2, validate/1, triples/1, words/1, wildcard/1]).
|
||||
|
||||
-export([join/1, feed_var/3, systop/1]).
|
||||
|
|
Loading…
Reference in New Issue