commit
8718e4197b
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_acl_internal).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_alarm).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_bridge).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_bridge_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_cli).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -76,8 +76,8 @@ subscribe(CPid, TopicTable) ->
|
|||
unsubscribe(CPid, Topics) ->
|
||||
gen_server:cast(CPid, {unsubscribe, Topics}).
|
||||
|
||||
init([Connection0, MqttEnv]) ->
|
||||
{ok, Connection} = Connection0:wait(),
|
||||
init([OriginConn, MqttEnv]) ->
|
||||
{ok, Connection} = OriginConn:wait(),
|
||||
{PeerHost, PeerPort, PeerName} =
|
||||
case Connection:peername() of
|
||||
{ok, Peer = {Host, Port}} ->
|
||||
|
@ -125,7 +125,7 @@ handle_call(info, _From, State = #client_state{connection = Connection,
|
|||
ProtoInfo = emqttd_protocol:info(ProtoState),
|
||||
{ok, SockStats} = Connection:getstat(?SOCK_STATS),
|
||||
{reply, lists:append([ClientInfo, [{proto_info, ProtoInfo},
|
||||
{sock_stats, SockStats}]]), State};
|
||||
{sock_stats, SockStats}]]), State};
|
||||
|
||||
handle_call(kick, _From, State) ->
|
||||
{stop, {shutdown, kick}, ok, State};
|
||||
|
@ -173,7 +173,7 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
|
|||
shutdown(conflict, State);
|
||||
|
||||
handle_info(activate_sock, State) ->
|
||||
noreply(run_socket(State#client_state{conn_state = running}));
|
||||
hibernate(run_socket(State#client_state{conn_state = running}));
|
||||
|
||||
handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
|
||||
Size = size(Data),
|
||||
|
@ -185,7 +185,7 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
|
|||
shutdown(Reason, State);
|
||||
|
||||
handle_info({inet_reply, _Sock, ok}, State) ->
|
||||
noreply(State);
|
||||
hibernate(State);
|
||||
|
||||
handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
|
||||
shutdown(Reason, State);
|
||||
|
@ -199,12 +199,12 @@ handle_info({keepalive, start, Interval}, State = #client_state{connection = Con
|
|||
end
|
||||
end,
|
||||
KeepAlive = emqttd_keepalive:start(StatFun, Interval, {keepalive, check}),
|
||||
noreply(State#client_state{keepalive = KeepAlive});
|
||||
hibernate(State#client_state{keepalive = KeepAlive});
|
||||
|
||||
handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) ->
|
||||
case emqttd_keepalive:check(KeepAlive) of
|
||||
{ok, KeepAlive1} ->
|
||||
noreply(State#client_state{keepalive = KeepAlive1});
|
||||
hibernate(State#client_state{keepalive = KeepAlive1});
|
||||
{error, timeout} ->
|
||||
?LOG(debug, "Keepalive timeout", [], State),
|
||||
shutdown(keepalive_timeout, State);
|
||||
|
@ -240,21 +240,21 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
|
||||
with_proto_state(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
{ok, ProtoState1} = Fun(ProtoState),
|
||||
noreply(State#client_state{proto_state = ProtoState1}).
|
||||
hibernate(State#client_state{proto_state = ProtoState1}).
|
||||
|
||||
with_session(Fun, State = #client_state{proto_state = ProtoState}) ->
|
||||
Fun(emqttd_protocol:session(ProtoState)),
|
||||
noreply(State).
|
||||
hibernate(State).
|
||||
|
||||
%% receive and parse tcp data
|
||||
received(<<>>, State) ->
|
||||
noreply(State);
|
||||
hibernate(State);
|
||||
|
||||
received(Bytes, State = #client_state{parser_fun = ParserFun,
|
||||
packet_opts = PacketOpts,
|
||||
proto_state = ProtoState}) ->
|
||||
case catch ParserFun(Bytes) of
|
||||
{more, NewParser} ->
|
||||
{more, NewParser} ->
|
||||
noreply(run_socket(State#client_state{parser_fun = NewParser}));
|
||||
{ok, Packet, Rest} ->
|
||||
emqttd_metrics:received(Packet),
|
||||
|
@ -300,6 +300,9 @@ run_socket(State = #client_state{connection = Connection}) ->
|
|||
State#client_state{await_recv = true}.
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State}.
|
||||
|
||||
hibernate(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_ctl).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_dist).
|
||||
|
||||
-import(lists, [concat/1]).
|
||||
|
|
|
@ -35,7 +35,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_guid).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_http).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -23,7 +23,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_keepalive).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_metrics).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mnesia).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mod_autosub).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mod_presence).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mod_rewrite).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mod_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -47,7 +47,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_mqueue).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_opts).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_packet).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_plugins).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_pooler_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_pubsub_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -21,11 +21,11 @@
|
|||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% MQTT retained message storage.
|
||||
%%%
|
||||
%%% TODO: should match topic tree
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
%% TODO: should match topic tree
|
||||
-module(emqttd_retained).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_serialiser).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -174,7 +174,7 @@ destroy(SessPid, ClientId) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec subscribe(pid(), [{binary(), mqtt_qos()}]) -> ok.
|
||||
subscribe(SessPid, TopicTable) ->
|
||||
subscribe(SessPid, TopicTable, fun(_) -> ok end).
|
||||
gen_server2:cast(SessPid, {subscribe, TopicTable, fun(_) -> ok end}).
|
||||
|
||||
-spec subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok.
|
||||
subscribe(SessPid, PacketId, TopicTable) ->
|
||||
|
@ -289,26 +289,24 @@ prioritise_info(Msg, _Len, _State) ->
|
|||
end.
|
||||
|
||||
handle_call(info, _From, State) ->
|
||||
{reply, sess_info(State), State};
|
||||
{reply, sess_info(State), State, hibernate};
|
||||
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}}, _From,
|
||||
Session = #session{client_id = ClientId,
|
||||
awaiting_rel = AwaitingRel,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, pktid = PktId}},
|
||||
_From, Session = #session{awaiting_rel = AwaitingRel,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
case check_awaiting_rel(Session) of
|
||||
true ->
|
||||
TRef = timer(Timeout, {timeout, awaiting_rel, PktId}),
|
||||
AwaitingRel1 = maps:put(PktId, {Msg, TRef}, AwaitingRel),
|
||||
{reply, ok, Session#session{awaiting_rel = AwaitingRel1}};
|
||||
false ->
|
||||
lager:critical([{client, ClientId}], "Session(~s) dropped Qos2 message "
|
||||
"for too many awaiting_rel: ~p", [ClientId, Msg]),
|
||||
{reply, {error, dropped}, Session}
|
||||
?LOG(critical, "Dropped Qos2 message for too many awaiting_rel: ~p", [Msg], Session),
|
||||
{reply, {error, dropped}, Session, hibernate}
|
||||
end;
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?LOG(critical, "Unexpected Request: ~p", [Req], State),
|
||||
{reply, {error, unsupported_req}, State}.
|
||||
{reply, {error, unsupported_req}, State, hibernate}.
|
||||
|
||||
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
|
@ -318,7 +316,7 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
|
|||
case TopicTable -- Subscriptions of
|
||||
[] ->
|
||||
AckFun([Qos || {_, Qos} <- TopicTable]),
|
||||
noreply(Session);
|
||||
hibernate(Session);
|
||||
_ ->
|
||||
%% subscribe first and don't care if the subscriptions have been existed
|
||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(TopicTable),
|
||||
|
@ -347,10 +345,10 @@ handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = Cli
|
|||
[{Topic, Qos} | Acc]
|
||||
end
|
||||
end, Subscriptions, TopicTable),
|
||||
noreply(Session#session{subscriptions = Subscriptions1})
|
||||
hibernate(Session#session{subscriptions = Subscriptions1})
|
||||
end;
|
||||
|
||||
handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
||||
handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
||||
subscriptions = Subscriptions}) ->
|
||||
|
||||
Topics = emqttd_broker:foldl_hooks('client.unsubscribe', [ClientId], Topics0),
|
||||
|
@ -358,7 +356,7 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
|||
%% unsubscribe from topic tree
|
||||
ok = emqttd_pubsub:unsubscribe(Topics),
|
||||
|
||||
lager:info([{client, ClientId}], "Session(~s) unsubscribe ~p", [ClientId, Topics]),
|
||||
?LOG(info, "unsubscribe ~p", [Topics], Session),
|
||||
|
||||
Subscriptions1 =
|
||||
lists:foldl(fun(Topic, Acc) ->
|
||||
|
@ -370,11 +368,11 @@ handle_cast({unsubscribe, Topics0}, Session = #session{client_id = ClientId,
|
|||
end
|
||||
end, Subscriptions, Topics),
|
||||
|
||||
noreply(Session#session{subscriptions = Subscriptions1});
|
||||
hibernate(Session#session{subscriptions = Subscriptions1});
|
||||
|
||||
handle_cast({destroy, ClientId}, Session = #session{client_id = ClientId}) ->
|
||||
?LOG(warning, "destroyed", [], Session),
|
||||
{stop, {shutdown, destroy}, Session};
|
||||
shutdown(destroy, Session);
|
||||
|
||||
handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = ClientId,
|
||||
client_pid = OldClientPid,
|
||||
|
@ -428,17 +426,17 @@ handle_cast({resume, ClientId, ClientPid}, Session = #session{client_id = C
|
|||
end, Session1, lists:reverse(InflightQ)),
|
||||
|
||||
%% Dequeue pending messages
|
||||
noreply(dequeue(Session2));
|
||||
hibernate(dequeue(Session2));
|
||||
|
||||
%% PUBACK
|
||||
handle_cast({puback, PktId}, Session = #session{awaiting_ack = AwaitingAck}) ->
|
||||
case maps:find(PktId, AwaitingAck) of
|
||||
{ok, TRef} ->
|
||||
cancel_timer(TRef),
|
||||
noreply(dequeue(acked(PktId, Session)));
|
||||
hibernate(dequeue(acked(PktId, Session)));
|
||||
error ->
|
||||
?LOG(warning, "Cannot find PUBACK: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
%% PUBREC
|
||||
|
@ -451,10 +449,10 @@ handle_cast({pubrec, PktId}, Session = #session{awaiting_ack = AwaitingAck,
|
|||
TRef1 = timer(Timeout, {timeout, awaiting_comp, PktId}),
|
||||
AwaitingComp1 = maps:put(PktId, TRef1, AwaitingComp),
|
||||
Session1 = acked(PktId, Session#session{awaiting_comp = AwaitingComp1}),
|
||||
noreply(dequeue(Session1));
|
||||
hibernate(dequeue(Session1));
|
||||
error ->
|
||||
?LOG(error, "Cannot find PUBREC: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
%% PUBREL
|
||||
|
@ -463,10 +461,10 @@ handle_cast({pubrel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|||
{ok, {Msg, TRef}} ->
|
||||
cancel_timer(TRef),
|
||||
emqttd_pubsub:publish(Msg),
|
||||
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
|
||||
hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
|
||||
error ->
|
||||
?LOG(error, "Cannot find PUBREL: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
%% PUBCOMP
|
||||
|
@ -474,27 +472,27 @@ handle_cast({pubcomp, PktId}, Session = #session{awaiting_comp = AwaitingComp})
|
|||
case maps:find(PktId, AwaitingComp) of
|
||||
{ok, TRef} ->
|
||||
cancel_timer(TRef),
|
||||
noreply(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
|
||||
hibernate(Session#session{awaiting_comp = maps:remove(PktId, AwaitingComp)});
|
||||
error ->
|
||||
?LOG(error, "Cannot find PUBCOMP: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
?LOG(critical, "Unexpected Msg: ~p", [Msg], State),
|
||||
noreply(State).
|
||||
hibernate(State).
|
||||
|
||||
%% Queue messages when client is offline
|
||||
handle_info({dispatch, Msg}, Session = #session{client_pid = undefined,
|
||||
message_queue = Q})
|
||||
when is_record(Msg, mqtt_message) ->
|
||||
noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
|
||||
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, Q)});
|
||||
|
||||
%% Dispatch qos0 message directly to client
|
||||
handle_info({dispatch, Msg = #mqtt_message{qos = ?QOS_0}},
|
||||
Session = #session{client_pid = ClientPid}) ->
|
||||
ClientPid ! {deliver, Msg},
|
||||
noreply(Session);
|
||||
hibernate(Session);
|
||||
|
||||
handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
||||
Session = #session{message_queue = MsgQ})
|
||||
|
@ -504,13 +502,13 @@ handle_info({dispatch, Msg = #mqtt_message{qos = QoS}},
|
|||
true ->
|
||||
noreply(deliver(Msg, Session));
|
||||
false ->
|
||||
noreply(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
||||
hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
|
||||
end;
|
||||
|
||||
handle_info({timeout, awaiting_ack, PktId}, Session = #session{client_pid = undefined,
|
||||
awaiting_ack = AwaitingAck}) ->
|
||||
%% just remove awaiting
|
||||
noreply(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
|
||||
hibernate(Session#session{awaiting_ack = maps:remove(PktId, AwaitingAck)});
|
||||
|
||||
handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue = InflightQ,
|
||||
awaiting_ack = AwaitingAck}) ->
|
||||
|
@ -518,39 +516,39 @@ handle_info({timeout, awaiting_ack, PktId}, Session = #session{inflight_queue =
|
|||
{ok, _TRef} ->
|
||||
case lists:keyfind(PktId, 1, InflightQ) of
|
||||
{_, Msg} ->
|
||||
noreply(redeliver(Msg, Session));
|
||||
hibernate(redeliver(Msg, Session));
|
||||
false ->
|
||||
?LOG(error, "AwaitingAck timeout but Cannot find PktId: ~p", [PktId], Session),
|
||||
noreply(dequeue(Session))
|
||||
hibernate(dequeue(Session))
|
||||
end;
|
||||
error ->
|
||||
?LOG(error, "Cannot find AwaitingAck: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
handle_info({timeout, awaiting_rel, PktId}, Session = #session{awaiting_rel = AwaitingRel}) ->
|
||||
case maps:find(PktId, AwaitingRel) of
|
||||
{ok, {_Msg, _TRef}} ->
|
||||
?LOG(error, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
|
||||
noreply(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
|
||||
?LOG(warning, "AwaitingRel Timout: ~p, Drop Message!", [PktId], Session),
|
||||
hibernate(Session#session{awaiting_rel = maps:remove(PktId, AwaitingRel)});
|
||||
error ->
|
||||
?LOG(error, "Cannot find AwaitingRel: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
handle_info({timeout, awaiting_comp, PktId}, Session = #session{awaiting_comp = Awaiting}) ->
|
||||
case maps:find(PktId, Awaiting) of
|
||||
{ok, _TRef} ->
|
||||
?LOG(error, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
|
||||
noreply(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
|
||||
?LOG(warning, "Awaiting PUBCOMP Timout: ~p", [PktId], Session),
|
||||
hibernate(Session#session{awaiting_comp = maps:remove(PktId, Awaiting)});
|
||||
error ->
|
||||
?LOG(error, "Cannot find Awaiting PUBCOMP: ~p", [PktId], Session),
|
||||
noreply(Session)
|
||||
hibernate(Session)
|
||||
end;
|
||||
|
||||
handle_info(collect_info, Session = #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
||||
emqttd_sm:register_session(CleanSess, ClientId, sess_info(Session)),
|
||||
noreply(start_collector(Session));
|
||||
hibernate(start_collector(Session));
|
||||
|
||||
handle_info({'EXIT', ClientPid, _Reason}, Session = #session{clean_sess = true,
|
||||
client_pid = ClientPid}) ->
|
||||
|
@ -561,22 +559,21 @@ handle_info({'EXIT', ClientPid, Reason}, Session = #session{clean_sess = fals
|
|||
expired_after = Expires}) ->
|
||||
?LOG(info, "Client ~p EXIT for ~p", [ClientPid, Reason], Session),
|
||||
TRef = timer(Expires, expired),
|
||||
erlang:garbage_collect(), %%TODO: ???
|
||||
noreply(Session#session{client_pid = undefined, expired_timer = TRef});
|
||||
hibernate(Session#session{client_pid = undefined, expired_timer = TRef});
|
||||
|
||||
handle_info({'EXIT', Pid, Reason}, Session = #session{client_pid = ClientPid}) ->
|
||||
|
||||
?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
|
||||
[ClientPid, Pid, Reason], Session),
|
||||
noreply(Session);
|
||||
hibernate(Session);
|
||||
|
||||
handle_info(expired, Session) ->
|
||||
?LOG(info, "expired, shutdown now.", [], Session),
|
||||
{stop, {shutdown, expired}, Session};
|
||||
shutdown(expired, Session);
|
||||
|
||||
handle_info(Info, Session) ->
|
||||
?LOG(critical, "Unexpected info: ~p", [Info], Session),
|
||||
{noreply, Session}.
|
||||
hibernate(Session).
|
||||
|
||||
terminate(_Reason, #session{clean_sess = CleanSess, client_id = ClientId}) ->
|
||||
emqttd_sm:unregister_session(CleanSess, ClientId).
|
||||
|
@ -693,8 +690,14 @@ cancel_timer(Ref) ->
|
|||
catch erlang:cancel_timer(Ref).
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, State}.
|
||||
|
||||
hibernate(State) ->
|
||||
{noreply, State, hibernate}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
{stop, {shutdown, Reason}, State}.
|
||||
|
||||
start_collector(Session = #session{collect_interval = 0}) ->
|
||||
Session;
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_sm_helper).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_sm_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_stats).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_sup).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_sysmon).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_topic).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -24,7 +24,6 @@
|
|||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_trace).
|
||||
|
||||
-author("Feng Lee <feng@emqtt.io>").
|
||||
|
|
|
@ -1,30 +1,29 @@
|
|||
%%------------------------------------------------------------------------------
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% Copyright (c) 2012-2015 eMQTT.IO, All Rights Reserved.
|
||||
%%
|
||||
%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%% of this software and associated documentation files (the "Software"), to deal
|
||||
%% in the Software without restriction, including without limitation the rights
|
||||
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%% copies of the Software, and to permit persons to whom the Software is
|
||||
%% furnished to do so, subject to the following conditions:
|
||||
%%
|
||||
%% The above copyright notice and this permission notice shall be included in all
|
||||
%% copies or substantial portions of the Software.
|
||||
%%
|
||||
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%% SOFTWARE.
|
||||
%%------------------------------------------------------------------------------
|
||||
%%%
|
||||
%%% Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
%%% of this software and associated documentation files (the "Software"), to deal
|
||||
%%% in the Software without restriction, including without limitation the rights
|
||||
%%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
%%% copies of the Software, and to permit persons to whom the Software is
|
||||
%%% furnished to do so, subject to the following conditions:
|
||||
%%%
|
||||
%%% The above copyright notice and this permission notice shall be included in all
|
||||
%%% copies or substantial portions of the Software.
|
||||
%%%
|
||||
%%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
%%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
%%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
%%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
%%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
%%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
%%% SOFTWARE.
|
||||
%%%-----------------------------------------------------------------------------
|
||||
%%% @doc
|
||||
%%% emqttd erlang vm.
|
||||
%%%
|
||||
%%% @end
|
||||
%%%-----------------------------------------------------------------------------
|
||||
|
||||
-module(emqttd_vm).
|
||||
|
||||
-author('huangdan').
|
||||
|
|
Loading…
Reference in New Issue