Improve the session module

This commit is contained in:
Feng Lee 2018-12-20 10:17:14 +08:00 committed by Feng Lee
parent 7a1ec580b0
commit 42fc8f5811
1 changed files with 88 additions and 82 deletions

View File

@ -158,8 +158,6 @@
-export_type([attr/0]).
-define(TIMEOUT, 60000).
-define(LOG(Level, Format, Args, _State),
emqx_logger:Level("[Session] " ++ Format, Args)).
@ -261,9 +259,11 @@ subscribe(SPid, PacketId, Properties, TopicFilters) ->
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message directly
emqx_broker:publish(Msg);
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly
emqx_broker:publish(Msg);
publish(SPid, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}) ->
%% Register QoS2 message packet ID (and timestamp) to session, then publish
case gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity) of
@ -275,6 +275,7 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}) ->
puback(SPid, PacketId) ->
gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}).
-spec(puback(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok).
puback(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
@ -322,7 +323,7 @@ discard(SPid, ByPid) ->
-spec(update_expiry_interval(spid(), timeout()) -> ok).
update_expiry_interval(SPid, Interval) ->
gen_server:cast(SPid, {expiry_interval, Interval}).
gen_server:cast(SPid, {update_expiry_interval, Interval}).
-spec(close(spid()) -> ok).
close(SPid) ->
@ -332,39 +333,39 @@ close(SPid) ->
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([Parent, #{zone := Zone,
client_id := ClientId,
username := Username,
conn_pid := ConnPid,
clean_start := CleanStart,
expiry_interval := ExpiryInterval,
max_inflight := MaxInflight,
will_msg := WillMsg}]) ->
emqx_logger:set_metadata_client_id(ClientId),
init([Parent, #{zone := Zone,
client_id := ClientId,
username := Username,
conn_pid := ConnPid,
clean_start := CleanStart,
expiry_interval := ExpiryInterval,
max_inflight := MaxInflight,
will_msg := WillMsg}]) ->
process_flag(trap_exit, true),
true = link(ConnPid),
emqx_logger:set_metadata_client_id(ClientId),
IdleTimout = get_env(Zone, idle_timeout, 30000),
State = #state{idle_timeout = IdleTimout,
clean_start = CleanStart,
binding = binding(ConnPid),
client_id = ClientId,
username = Username,
conn_pid = ConnPid,
subscriptions = #{},
max_subscriptions = get_env(Zone, max_subscriptions, 0),
upgrade_qos = get_env(Zone, upgrade_qos, false),
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone),
retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{},
await_rel_timeout = get_env(Zone, await_rel_timeout),
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = ExpiryInterval,
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp(),
will_msg = WillMsg
State = #state{idle_timeout = IdleTimout,
clean_start = CleanStart,
binding = binding(ConnPid),
client_id = ClientId,
username = Username,
conn_pid = ConnPid,
subscriptions = #{},
max_subscriptions = get_env(Zone, max_subscriptions, 0),
upgrade_qos = get_env(Zone, upgrade_qos, false),
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone),
retry_interval = get_env(Zone, retry_interval, 0),
awaiting_rel = #{},
await_rel_timeout = get_env(Zone, await_rel_timeout),
max_awaiting_rel = get_env(Zone, max_awaiting_rel),
expiry_interval = ExpiryInterval,
enable_stats = get_env(Zone, enable_stats, true),
deliver_stats = 0,
enqueue_stats = 0,
created_at = os:timestamp(),
will_msg = WillMsg
},
ok = emqx_sm:register_session(ClientId, self()),
true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
@ -397,53 +398,56 @@ handle_call(stats, _From, State) ->
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid], State),
{stop, {shutdown, discard}, ok, State};
{stop, discarded, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State),
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discard}, ok, State};
{stop, discarded, ok, State};
%% PUBLISH: This is only to register packetId to session state.
%% The actual message dispatching should be done by the caller (e.g. connection) process.
handle_call({register_publish_packet_id, PacketId, Ts}, _From,
State = #state{awaiting_rel = AwaitingRel}) ->
reply(case is_awaiting_full(State) of
false ->
case maps:is_key(PacketId, AwaitingRel) of
true ->
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
false ->
State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
{ok, ensure_await_rel_timer(State1)}
end;
true ->
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
reply(
case is_awaiting_full(State) of
false ->
case maps:is_key(PacketId, AwaitingRel) of
true ->
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
false ->
State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
{ok, ensure_await_rel_timer(State1)}
end;
true ->
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
%% PUBREC:
handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
reply(case emqx_inflight:contain(PacketId, Inflight) of
true ->
{ok, acked(pubrec, PacketId, State)};
false ->
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
reply(
case emqx_inflight:contain(PacketId, Inflight) of
true ->
{ok, acked(pubrec, PacketId, State)};
false ->
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
%% PUBREL:
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
reply(case maps:take(PacketId, AwaitingRel) of
{_Ts, AwaitingRel1} ->
{ok, State#state{awaiting_rel = AwaitingRel1}};
error ->
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
reply(
case maps:take(PacketId, AwaitingRel) of
{_Ts, AwaitingRel1} ->
{ok, State#state{awaiting_rel = AwaitingRel1}};
error ->
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State),
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
handle_call(close, _From, State) ->
{stop, normal, ok, State};
@ -494,25 +498,27 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
%% PUBACK:
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
noreply(dequeue(acked(puback, PacketId, State)));
false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
emqx_metrics:trans(inc, 'packets/puback/missed'),
{noreply, State}
end;
noreply(
case emqx_inflight:contain(PacketId, Inflight) of
true ->
dequeue(acked(puback, PacketId, State));
false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
emqx_metrics:trans(inc, 'packets/puback/missed'),
State
end);
%% PUBCOMP:
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
noreply(dequeue(acked(pubcomp, PacketId, State)));
false ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
{noreply, State}
end;
noreply(
case emqx_inflight:contain(PacketId, Inflight) of
true ->
dequeue(acked(pubcomp, PacketId, State));
false ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
State
end);
%% RESUME:
handle_cast({resume, #{conn_pid := ConnPid,
@ -561,7 +567,7 @@ handle_cast({resume, #{conn_pid := ConnPid,
%% Replay delivery and Dequeue pending messages
noreply(dequeue(retry_delivery(true, State1)));
handle_cast({expiry_interval, Interval}, State) ->
handle_cast({update_expiry_interval, Interval}, State) ->
{noreply, State#state{expiry_interval = Interval}};
handle_cast(Msg, State) ->