session
This commit is contained in:
parent
470ac34a6d
commit
08a64ee97b
|
@ -196,6 +196,11 @@
|
||||||
packet_id = PacketId},
|
packet_id = PacketId},
|
||||||
payload = Payload}).
|
payload = Payload}).
|
||||||
|
|
||||||
|
-define(PUBLISH(Qos, PacketId),
|
||||||
|
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
|
||||||
|
qos = Qos},
|
||||||
|
variable = #mqtt_packet_publish{packet_id = PacketId}}).
|
||||||
|
|
||||||
-define(PUBACK_PACKET(Type, PacketId),
|
-define(PUBACK_PACKET(Type, PacketId),
|
||||||
#mqtt_packet{header = #mqtt_packet_header{type = Type},
|
#mqtt_packet{header = #mqtt_packet_header{type = Type},
|
||||||
variable = #mqtt_packet_puback{packet_id = PacketId}}).
|
variable = #mqtt_packet_puback{packet_id = PacketId}}).
|
||||||
|
|
|
@ -170,37 +170,18 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
|
||||||
%% Send connack
|
%% Send connack
|
||||||
send(?CONNACK_PACKET(ReturnCode1), State3);
|
send(?CONNACK_PACKET(ReturnCode1), State3);
|
||||||
|
|
||||||
handle(Packet = ?PUBLISH_PACKET(?QOS_0, Topic, _PacketId, _Payload),
|
handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
|
||||||
State = #proto_state{clientid = ClientId, session = Session}) ->
|
State = #proto_state{clientid = ClientId}) ->
|
||||||
|
|
||||||
case check_acl(publish, Topic, State) of
|
case check_acl(publish, Topic, State) of
|
||||||
allow ->
|
allow ->
|
||||||
do_publish(Session, ClientId, Packet);
|
publish(Packet, State);
|
||||||
deny ->
|
deny ->
|
||||||
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
|
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic])
|
||||||
end,
|
end,
|
||||||
{ok, State};
|
{ok, State};
|
||||||
|
|
||||||
handle(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload),
|
|
||||||
State = #proto_state{clientid = ClientId, session = Session}) ->
|
|
||||||
case check_acl(publish, Topic, State) of
|
|
||||||
allow ->
|
|
||||||
do_publish(Session, ClientId, Packet),
|
|
||||||
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
|
||||||
deny ->
|
|
||||||
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
|
|
||||||
{ok, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload),
|
|
||||||
State = #proto_state{clientid = ClientId, session = Session}) ->
|
|
||||||
case check_acl(publish, Topic, State) of
|
|
||||||
allow ->
|
|
||||||
do_publish(Session, ClientId, Packet),
|
|
||||||
send(?PUBACK_PACKET(?PUBREC, PacketId), State);
|
|
||||||
deny ->
|
|
||||||
lager:error("ACL Deny: ~s cannot publish to ~s", [ClientId, Topic]),
|
|
||||||
{ok, State}
|
|
||||||
end;
|
|
||||||
|
|
||||||
handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
|
handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
|
||||||
emqttd_session:puback(Session, PacketId),
|
emqttd_session:puback(Session, PacketId),
|
||||||
|
@ -256,10 +237,26 @@ handle(?PACKET(?DISCONNECT), State) ->
|
||||||
% clean willmsg
|
% clean willmsg
|
||||||
{stop, normal, State#proto_state{will_msg = undefined}}.
|
{stop, normal, State#proto_state{will_msg = undefined}}.
|
||||||
|
|
||||||
do_publish(Session, ClientId, Packet) ->
|
publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{clientid = ClientId, session = Session}) ->
|
||||||
Msg = emqttd_message:from_packet(ClientId, Packet),
|
emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
|
||||||
Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
|
|
||||||
emqttd_session:publish(Session, Msg1).
|
publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{clientid = ClientId, session = Session}) ->
|
||||||
|
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
||||||
|
ok ->
|
||||||
|
send(?PUBACK_PACKET(?PUBACK, PacketId), State);
|
||||||
|
{error, Error} ->
|
||||||
|
%%TODO: log format...
|
||||||
|
lager:error("Client ~s: publish qos1 error ~p", [ClientId, Error])
|
||||||
|
end;
|
||||||
|
|
||||||
|
publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{clientid = ClientId, session = Session}) ->
|
||||||
|
case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
|
||||||
|
ok ->
|
||||||
|
send(?PUBACK_PACKET(?PUBREC, PacketId), State);
|
||||||
|
{error, Error} ->
|
||||||
|
%%TODO: log format...
|
||||||
|
lager:error("Client ~s: publish qos2 error ~p", [ClientId, Error])
|
||||||
|
end.
|
||||||
|
|
||||||
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
-spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
|
||||||
send(Msg, State) when is_record(Msg, mqtt_message) ->
|
send(Msg, State) when is_record(Msg, mqtt_message) ->
|
||||||
|
@ -323,7 +320,7 @@ send_willmsg(_ClientId, undefined) ->
|
||||||
ignore;
|
ignore;
|
||||||
send_willmsg(ClientId, WillMsg) ->
|
send_willmsg(ClientId, WillMsg) ->
|
||||||
lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]),
|
lager:info("Client ~s send willmsg: ~p", [ClientId, WillMsg]),
|
||||||
emqttd_pubsub:publish(ClientId, WillMsg).
|
emqttd_pubsub:publish(WillMsg#mqtt_message{from = ClientId}).
|
||||||
|
|
||||||
start_keepalive(0) -> ignore;
|
start_keepalive(0) -> ignore;
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
-export([create/1,
|
-export([create/1,
|
||||||
subscribe/1,
|
subscribe/1,
|
||||||
unsubscribe/1,
|
unsubscribe/1,
|
||||||
publish/2,
|
publish/1,
|
||||||
%local node
|
%local node
|
||||||
dispatch/2, match/1]).
|
dispatch/2, match/1]).
|
||||||
|
|
||||||
|
@ -81,7 +81,7 @@ mnesia(boot) ->
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, mqtt_subscriber},
|
{record_name, mqtt_subscriber},
|
||||||
{attributes, record_info(fields, mqtt_subscriber)},
|
{attributes, record_info(fields, mqtt_subscriber)},
|
||||||
{index, [pid]},
|
{index, [subpid]},
|
||||||
{local_content, true}]);
|
{local_content, true}]);
|
||||||
|
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
|
@ -156,19 +156,23 @@ cast(Msg) ->
|
||||||
%% @doc Publish to cluster nodes
|
%% @doc Publish to cluster nodes
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec publish(From :: mqtt_clientid() | atom(), Msg :: mqtt_message()) -> ok.
|
-spec publish(Msg :: mqtt_message()) -> ok.
|
||||||
publish(From, #mqtt_message{topic=Topic} = Msg) ->
|
publish(#mqtt_message{topic=Topic, from = From} = Msg) ->
|
||||||
trace(publish, From, Msg),
|
trace(publish, From, Msg),
|
||||||
|
|
||||||
|
%%TODO:call hooks here...
|
||||||
|
%%Msg1 = emqttd_broker:foldl_hooks(client_publish, [], Msg),
|
||||||
|
|
||||||
%% Retain message first. Don't create retained topic.
|
%% Retain message first. Don't create retained topic.
|
||||||
case emqttd_msg_store:retain(Msg) of
|
case emqttd_msg_store:retain(Msg) of
|
||||||
ok ->
|
ok ->
|
||||||
%TODO: why unset 'retain' flag?
|
%TODO: why unset 'retain' flag?
|
||||||
publish(From, Topic, emqttd_message:unset_flag(Msg));
|
publish(Topic, emqttd_message:unset_flag(Msg));
|
||||||
ignore ->
|
ignore ->
|
||||||
publish(From, Topic, Msg)
|
publish(Topic, Msg)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
publish(<<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) ->
|
fun(#mqtt_queue{subpid = SubPid, qos = SubQos}) ->
|
||||||
Msg1 = if
|
Msg1 = if
|
||||||
|
@ -178,7 +182,7 @@ publish(From, <<"$Q/", _/binary>> = Queue, #mqtt_message{qos = Qos} = Msg) ->
|
||||||
SubPid ! {dispatch, Msg1}
|
SubPid ! {dispatch, Msg1}
|
||||||
end, mnesia:dirty_read(queue, Queue));
|
end, mnesia:dirty_read(queue, Queue));
|
||||||
|
|
||||||
publish(_From, Topic, Msg) when is_binary(Topic) ->
|
publish(Topic, Msg) when is_binary(Topic) ->
|
||||||
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
|
lists:foreach(fun(#mqtt_topic{topic=Name, node=Node}) ->
|
||||||
case Node =:= node() of
|
case Node =:= node() of
|
||||||
true -> dispatch(Name, Msg);
|
true -> dispatch(Name, Msg);
|
||||||
|
|
|
@ -21,13 +21,13 @@
|
||||||
%%%-----------------------------------------------------------------------------
|
%%%-----------------------------------------------------------------------------
|
||||||
%%% @doc
|
%%% @doc
|
||||||
%%%
|
%%%
|
||||||
%%% emqttd session for persistent client.
|
%%% Session for persistent MQTT client.
|
||||||
%%%
|
%%%
|
||||||
%%% Session State in the broker consists of:
|
%%% Session State in the broker consists of:
|
||||||
%%%
|
%%%
|
||||||
%%% 1. The Client’s subscriptions.
|
%%% 1. The Client’s subscriptions.
|
||||||
%%%
|
%%%
|
||||||
%%% 2. inflight qos1, qos2 messages sent to the client but unacked, QoS 1 and QoS 2
|
%%% 2. inflight qos1/2 messages sent to the client but unacked, QoS 1 and QoS 2
|
||||||
%%% messages which have been sent to the Client, but have not been completely
|
%%% messages which have been sent to the Client, but have not been completely
|
||||||
%%% acknowledged.
|
%%% acknowledged.
|
||||||
%%%
|
%%%
|
||||||
|
@ -59,6 +59,8 @@
|
||||||
puback/2, pubrec/2, pubrel/2, pubcomp/2,
|
puback/2, pubrec/2, pubrel/2, pubcomp/2,
|
||||||
subscribe/2, unsubscribe/2]).
|
subscribe/2, unsubscribe/2]).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% gen_server Function Exports
|
%% gen_server Function Exports
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
|
||||||
terminate/2, code_change/3]).
|
terminate/2, code_change/3]).
|
||||||
|
@ -116,7 +118,7 @@
|
||||||
max_awaiting_rel = 100,
|
max_awaiting_rel = 100,
|
||||||
|
|
||||||
%% session expired after 48 hours
|
%% session expired after 48 hours
|
||||||
expired_after = 172800,
|
expired_after = 48,
|
||||||
|
|
||||||
expired_timer,
|
expired_timer,
|
||||||
|
|
||||||
|
@ -126,6 +128,7 @@
|
||||||
%% @doc Start a session.
|
%% @doc Start a session.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec start_link(boolean(), binary(), pid()) -> {ok, pid()} | {error, any()}.
|
||||||
start_link(CleanSess, ClientId, ClientPid) ->
|
start_link(CleanSess, ClientId, ClientPid) ->
|
||||||
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
gen_server:start_link(?MODULE, [CleanSess, ClientId, ClientPid], []).
|
||||||
|
|
||||||
|
@ -133,7 +136,8 @@ start_link(CleanSess, ClientId, ClientPid) ->
|
||||||
%% @doc Resume a session.
|
%% @doc Resume a session.
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
resume(Session, ClientId, ClientPid) when is_pid(Session) ->
|
-spec resume(pid(), binary(), pid()) -> ok.
|
||||||
|
resume(Session, ClientId, ClientPid) ->
|
||||||
gen_server:cast(Session, {resume, ClientId, ClientPid}).
|
gen_server:cast(Session, {resume, ClientId, ClientPid}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -141,73 +145,69 @@ resume(Session, ClientId, ClientPid) when is_pid(Session) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec destroy(Session:: pid(), ClientId :: binary()) -> ok.
|
-spec destroy(Session:: pid(), ClientId :: binary()) -> ok.
|
||||||
destroy(Session, ClientId) when is_pid(Session) ->
|
destroy(Session, ClientId) ->
|
||||||
gen_server:call(Session, {destroy, ClientId}).
|
gen_server:call(Session, {destroy, ClientId}).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% @doc Subscribe Topics
|
||||||
|
%% @end
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
|
||||||
|
subscribe(Session, TopicTable) ->
|
||||||
|
gen_server:call(Session, {subscribe, TopicTable}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Publish message
|
%% @doc Publish message
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok.
|
-spec publish(Session :: pid(), {mqtt_qos(), mqtt_message()}) -> ok.
|
||||||
publish(Session, Msg = #mqtt_message{qos = ?QOS_0}) when is_pid(Session) ->
|
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
|
||||||
%% publish qos0 directly
|
%% publish qos0 directly
|
||||||
emqttd_pubsub:publish(Msg);
|
emqttd_pubsub:publish(Msg);
|
||||||
|
|
||||||
publish(Session, Msg = #mqtt_message{qos = ?QOS_1}) when is_pid(Session) ->
|
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
|
||||||
%% publish qos1 directly, and client will puback
|
%% publish qos1 directly, and client will puback automatically
|
||||||
emqttd_pubsub:publish(Msg);
|
emqttd_pubsub:publish(Msg);
|
||||||
|
|
||||||
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) when is_pid(Session) ->
|
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
|
||||||
%% publish qos2 by session
|
%% publish qos2 by session
|
||||||
gen_server:cast(Session, {publish, Msg}).
|
gen_server:call(Session, {publish, Msg}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc PubAck message
|
%% @doc PubAck message
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec puback(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
|
-spec puback(pid(), mqtt_msgid()) -> ok.
|
||||||
puback(Session, MsgId) when is_pid(Session) ->
|
puback(Session, MsgId) ->
|
||||||
gen_server:cast(Session, {puback, MsgId}).
|
gen_server:cast(Session, {puback, MsgId}).
|
||||||
|
|
||||||
-spec pubrec(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
|
-spec pubrec(pid(), mqtt_msgid()) -> ok.
|
||||||
pubrec(Session, MsgId) when is_pid(Session) ->
|
pubrec(Session, MsgId) ->
|
||||||
gen_server:cast(Session, {pubrec, MsgId}).
|
gen_server:cast(Session, {pubrec, MsgId}).
|
||||||
|
|
||||||
-spec pubrel(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
|
-spec pubrel(pid(), mqtt_msgid()) -> ok.
|
||||||
pubrel(Session, MsgId) when is_pid(Session) ->
|
pubrel(Session, MsgId) ->
|
||||||
gen_server:cast(Session, {pubrel, MsgId}).
|
gen_server:cast(Session, {pubrel, MsgId}).
|
||||||
|
|
||||||
-spec pubcomp(Session :: pid(), MsgId :: mqtt_packet_id()) -> ok.
|
-spec pubcomp(pid(), mqtt_msgid()) -> ok.
|
||||||
pubcomp(Session, MsgId) when is_pid(Session) ->
|
pubcomp(Session, MsgId) ->
|
||||||
gen_server:cast(Session, {pubcomp, MsgId}).
|
gen_server:cast(Session, {pubcomp, MsgId}).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
%% @doc Subscribe Topics
|
|
||||||
%% @end
|
|
||||||
%%------------------------------------------------------------------------------
|
|
||||||
-spec subscribe(Session :: pid(), [{binary(), mqtt_qos()}]) -> {ok, [mqtt_qos()]}.
|
|
||||||
subscribe(Session, Topics) when is_pid(Session) ->
|
|
||||||
gen_server:call(Session, {subscribe, Topics}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% @doc Unsubscribe Topics
|
%% @doc Unsubscribe Topics
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
-spec unsubscribe(Session :: pid(), [Topic :: binary()]) -> ok.
|
-spec unsubscribe(pid(), [binary()]) -> ok.
|
||||||
unsubscribe(Session, Topics) when is_pid(Session) ->
|
unsubscribe(Session, Topics) ->
|
||||||
gen_server:call(Session, {unsubscribe, Topics}).
|
gen_server:call(Session, {unsubscribe, Topics}).
|
||||||
|
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
%%%=============================================================================
|
%%%=============================================================================
|
||||||
|
|
||||||
init([CleanSess, ClientId, ClientPid]) ->
|
init([CleanSess, ClientId, ClientPid]) ->
|
||||||
if
|
process_flag(trap_exit, true),
|
||||||
CleanSess =:= false ->
|
true = link(ClientPid),
|
||||||
process_flag(trap_exit, true),
|
|
||||||
true = link(ClientPid);
|
|
||||||
CleanSess =:= true ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
QEnv = emqttd:env(mqtt, queue),
|
QEnv = emqttd:env(mqtt, queue),
|
||||||
SessEnv = emqttd:env(mqtt, session),
|
SessEnv = emqttd:env(mqtt, session),
|
||||||
PendingQ = emqttd_mqueue:new(ClientId, QEnv),
|
PendingQ = emqttd_mqueue:new(ClientId, QEnv),
|
||||||
|
@ -227,25 +227,25 @@ init([CleanSess, ClientId, ClientPid]) ->
|
||||||
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
|
await_rel_timeout = emqttd_opts:g(await_rel_timeout, SessEnv),
|
||||||
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
|
max_awaiting_rel = emqttd_opts:g(max_awaiting_rel, SessEnv),
|
||||||
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
|
expired_after = emqttd_opts:g(expired_after, SessEnv) * 3600,
|
||||||
timestamp = os:timestamp()
|
timestamp = os:timestamp()},
|
||||||
},
|
|
||||||
{ok, Session, hibernate}.
|
{ok, Session, hibernate}.
|
||||||
|
|
||||||
handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId, subscriptions = Subscriptions}) ->
|
handle_call({subscribe, Topics}, _From, Session = #session{clientid = ClientId,
|
||||||
|
subscriptions = Subscriptions}) ->
|
||||||
|
|
||||||
%% subscribe first and don't care if the subscriptions have been existed
|
%% subscribe first and don't care if the subscriptions have been existed
|
||||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
||||||
|
|
||||||
lager:info([{client, ClientId}], "Session ~s subscribe ~p. Granted QoS: ~p",
|
lager:info([{client, ClientId}], "Session ~s subscribe ~p, Granted QoS: ~p",
|
||||||
[ClientId, Topics, GrantedQos]),
|
[ClientId, Topics, GrantedQos]),
|
||||||
|
|
||||||
Subscriptions1 =
|
Subscriptions1 =
|
||||||
lists:foldl(fun({Topic, Qos}, Acc) ->
|
lists:foldl(fun({Topic, Qos}, Acc) ->
|
||||||
case lists:keyfind(Topic, 1, Acc) of
|
case lists:keyfind(Topic, 1, Acc) of
|
||||||
{Topic, Qos} ->
|
{Topic, Qos} ->
|
||||||
lager:warning([{client, ClientId}], "~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc;
|
lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: qos = ~p", [ClientId, Topic, Qos]), Acc;
|
||||||
{Topic, Old} ->
|
{Topic, Old} ->
|
||||||
lager:warning([{client, ClientId}], "~s resubscribe ~p: old qos=~p, new qos=~p",
|
lager:warning([{client, ClientId}], "Session ~s resubscribe ~p: old qos=~p, new qos=~p",
|
||||||
[ClientId, Topic, Old, Qos]),
|
[ClientId, Topic, Old, Qos]),
|
||||||
lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
|
lists:keyreplace(Topic, 1, Acc, {Topic, Qos});
|
||||||
false ->
|
false ->
|
||||||
|
@ -263,7 +263,7 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId
|
||||||
|
|
||||||
%%unsubscribe from topic tree
|
%%unsubscribe from topic tree
|
||||||
ok = emqttd_pubsub:unsubscribe(Topics),
|
ok = emqttd_pubsub:unsubscribe(Topics),
|
||||||
lager:info([{client, ClientId}], "Client ~s unsubscribe ~p.", [ClientId, Topics]),
|
lager:info([{client, ClientId}], "Session ~s unsubscribe ~p.", [ClientId, Topics]),
|
||||||
|
|
||||||
Subscriptions1 =
|
Subscriptions1 =
|
||||||
lists:foldl(fun(Topic, Acc) ->
|
lists:foldl(fun(Topic, Acc) ->
|
||||||
|
@ -277,12 +277,24 @@ handle_call({unsubscribe, Topics}, _From, Session = #session{clientid = ClientId
|
||||||
|
|
||||||
{reply, ok, Session#session{subscriptions = Subscriptions1}};
|
{reply, ok, Session#session{subscriptions = Subscriptions1}};
|
||||||
|
|
||||||
|
handle_call({publish, Message = #mqtt_message{qos = ?QOS_2, msgid = MsgId}}, _From,
|
||||||
|
Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||||
|
case check_awaiting_rel(Session) of
|
||||||
|
true ->
|
||||||
|
TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
|
||||||
|
{reply, ok, Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)}};
|
||||||
|
false ->
|
||||||
|
lager:error([{clientid, ClientId}], "Session ~s "
|
||||||
|
" dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]),
|
||||||
|
{reply, {error, dropped}, Session}
|
||||||
|
end;
|
||||||
|
|
||||||
handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) ->
|
handle_call({destroy, ClientId}, _From, Session = #session{clientid = ClientId}) ->
|
||||||
lager:warning("Session ~s destroyed", [ClientId]),
|
lager:warning("Session ~s destroyed", [ClientId]),
|
||||||
{stop, {shutdown, destroy}, ok, Session};
|
{stop, {shutdown, destroy}, ok, Session};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
lager:error("Unexpected Request: ~p", [Req]),
|
lager:critical("Unexpected Request: ~p", [Req]),
|
||||||
{reply, {error, badreq}, State}.
|
{reply, {error, badreq}, State}.
|
||||||
|
|
||||||
handle_cast({resume, ClientId, ClientPid}, State = #session{
|
handle_cast({resume, ClientId, ClientPid}, State = #session{
|
||||||
|
@ -331,9 +343,6 @@ handle_cast({resume, ClientId, ClientPid}, State = #session{
|
||||||
pending_queue = emqttd_queue:clear(Queue),
|
pending_queue = emqttd_queue:clear(Queue),
|
||||||
expired_timer = undefined}, hibernate};
|
expired_timer = undefined}, hibernate};
|
||||||
|
|
||||||
handle_cast({publish, Message = #mqtt_message{qos = ?QOS_2}}, Session) ->
|
|
||||||
{noreply, publish_qos2(Message, Session)};
|
|
||||||
|
|
||||||
|
|
||||||
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) ->
|
handle_cast({puback, MsgId}, Session = #session{clientid = ClientId, inflight_queue = Q, awaiting_ack = Awaiting}) ->
|
||||||
case maps:find(MsgId, Awaiting) of
|
case maps:find(MsgId, Awaiting) of
|
||||||
|
@ -362,14 +371,14 @@ handle_cast({pubrec, MsgId}, Session = #session{clientid = ClientId,
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = Awaiting}) ->
|
handle_cast({pubrel, MsgId}, Session = #session{clientid = ClientId, awaiting_rel = AwaitingRel}) ->
|
||||||
case maps:find(MsgId, Awaiting) of
|
case maps:find(MsgId, AwaitingRel) of
|
||||||
{ok, {Msg, TRef}} ->
|
{ok, {Msg, TRef}} ->
|
||||||
catch erlang:cancel_timer(TRef),
|
catch erlang:cancel_timer(TRef),
|
||||||
emqttd_pubsub:publish(Msg),
|
emqttd_pubsub:publish(Msg),
|
||||||
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, Awaiting)}};
|
{noreply, Session#session{awaiting_rel = maps:remove(MsgId, AwaitingRel)}};
|
||||||
error ->
|
error ->
|
||||||
lager:error("Session ~s cannot find PUBREL'~p'!", [ClientId, MsgId]),
|
lager:error("Session ~s cannot find PUBREL '~p'!", [ClientId, MsgId]),
|
||||||
{noreply, Session}
|
{noreply, Session}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -408,6 +417,10 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = false,
|
||||||
TRef = timer(Expires * 1000, session_expired),
|
TRef = timer(Expires * 1000, session_expired),
|
||||||
{noreply, Session#session{expired_timer = TRef}};
|
{noreply, Session#session{expired_timer = TRef}};
|
||||||
|
|
||||||
|
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true, client_pid = ClientPid}) ->
|
||||||
|
%%TODO: reason...
|
||||||
|
{stop, normal, Session};
|
||||||
|
|
||||||
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
|
handle_info({'EXIT', ClientPid0, _Reason}, State = #session{client_pid = ClientPid}) ->
|
||||||
lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
lager:critical("Unexpected Client EXIT: pid=~p, pid(state): ~p", [ClientPid0, ClientPid]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
@ -456,19 +469,6 @@ code_change(_OldVsn, Session, _Extra) ->
|
||||||
%% @end
|
%% @end
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
publish_qos2(Message = #mqtt_message{qos = ?QOS_2,msgid = MsgId}, Session = #session{clientid = ClientId,
|
|
||||||
awaiting_rel = AwaitingRel,
|
|
||||||
await_rel_timeout = Timeout}) ->
|
|
||||||
|
|
||||||
case check_awaiting_rel(Session) of
|
|
||||||
true ->
|
|
||||||
TRef = timer(Timeout, {timeout, awaiting_rel, MsgId}),
|
|
||||||
Session#session{awaiting_rel = maps:put(MsgId, {Message, TRef}, AwaitingRel)};
|
|
||||||
false ->
|
|
||||||
lager:error([{clientid, ClientId}], "Session ~s "
|
|
||||||
" dropped Qos2 message for too many awaiting_rel: ~p", [ClientId, Message]),
|
|
||||||
Session
|
|
||||||
end.
|
|
||||||
|
|
||||||
check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
|
check_awaiting_rel(#session{max_awaiting_rel = 0}) ->
|
||||||
true;
|
true;
|
||||||
|
|
Loading…
Reference in New Issue