From 42fc8f5811d551aebee9578ed16da5ad90eae043 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 20 Dec 2018 10:17:14 +0800 Subject: [PATCH] Improve the session module --- src/emqx_session.erl | 170 ++++++++++++++++++++++--------------------- 1 file changed, 88 insertions(+), 82 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 9117dd1b8..9c28884c9 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -158,8 +158,6 @@ -export_type([attr/0]). --define(TIMEOUT, 60000). - -define(LOG(Level, Format, Args, _State), emqx_logger:Level("[Session] " ++ Format, Args)). @@ -261,9 +259,11 @@ subscribe(SPid, PacketId, Properties, TopicFilters) -> publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 message directly emqx_broker:publish(Msg); + publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_1}) -> %% Publish QoS1 message directly emqx_broker:publish(Msg); + publish(SPid, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}) -> %% Register QoS2 message packet ID (and timestamp) to session, then publish case gen_server:call(SPid, {register_publish_packet_id, PacketId, Ts}, infinity) of @@ -275,6 +275,7 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}) -> puback(SPid, PacketId) -> gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). +-spec(puback(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). puback(SPid, PacketId, ReasonCode) -> gen_server:cast(SPid, {puback, PacketId, ReasonCode}). @@ -322,7 +323,7 @@ discard(SPid, ByPid) -> -spec(update_expiry_interval(spid(), timeout()) -> ok). update_expiry_interval(SPid, Interval) -> - gen_server:cast(SPid, {expiry_interval, Interval}). + gen_server:cast(SPid, {update_expiry_interval, Interval}). -spec(close(spid()) -> ok). close(SPid) -> @@ -332,39 +333,39 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init([Parent, #{zone := Zone, - client_id := ClientId, - username := Username, - conn_pid := ConnPid, - clean_start := CleanStart, - expiry_interval := ExpiryInterval, - max_inflight := MaxInflight, - will_msg := WillMsg}]) -> - emqx_logger:set_metadata_client_id(ClientId), +init([Parent, #{zone := Zone, + client_id := ClientId, + username := Username, + conn_pid := ConnPid, + clean_start := CleanStart, + expiry_interval := ExpiryInterval, + max_inflight := MaxInflight, + will_msg := WillMsg}]) -> process_flag(trap_exit, true), true = link(ConnPid), + emqx_logger:set_metadata_client_id(ClientId), IdleTimout = get_env(Zone, idle_timeout, 30000), - State = #state{idle_timeout = IdleTimout, - clean_start = CleanStart, - binding = binding(ConnPid), - client_id = ClientId, - username = Username, - conn_pid = ConnPid, - subscriptions = #{}, - max_subscriptions = get_env(Zone, max_subscriptions, 0), - upgrade_qos = get_env(Zone, upgrade_qos, false), - inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone), - retry_interval = get_env(Zone, retry_interval, 0), - awaiting_rel = #{}, - await_rel_timeout = get_env(Zone, await_rel_timeout), - max_awaiting_rel = get_env(Zone, max_awaiting_rel), - expiry_interval = ExpiryInterval, - enable_stats = get_env(Zone, enable_stats, true), - deliver_stats = 0, - enqueue_stats = 0, - created_at = os:timestamp(), - will_msg = WillMsg + State = #state{idle_timeout = IdleTimout, + clean_start = CleanStart, + binding = binding(ConnPid), + client_id = ClientId, + username = Username, + conn_pid = ConnPid, + subscriptions = #{}, + max_subscriptions = get_env(Zone, max_subscriptions, 0), + upgrade_qos = get_env(Zone, upgrade_qos, false), + inflight = emqx_inflight:new(MaxInflight), + mqueue = init_mqueue(Zone), + retry_interval = get_env(Zone, retry_interval, 0), + awaiting_rel = #{}, + await_rel_timeout = get_env(Zone, await_rel_timeout), + max_awaiting_rel = get_env(Zone, max_awaiting_rel), + expiry_interval = ExpiryInterval, + enable_stats = get_env(Zone, enable_stats, true), + deliver_stats = 0, + enqueue_stats = 0, + created_at = os:timestamp(), + will_msg = WillMsg }, ok = emqx_sm:register_session(ClientId, self()), true = emqx_sm:set_session_attrs(ClientId, attrs(State)), @@ -397,53 +398,56 @@ handle_call(stats, _From, State) -> handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) -> ?LOG(warning, "Discarded by ~p", [ByPid], State), - {stop, {shutdown, discard}, ok, State}; + {stop, discarded, ok, State}; handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) -> ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State), ConnPid ! {shutdown, discard, {ClientId, ByPid}}, - {stop, {shutdown, discard}, ok, State}; + {stop, discarded, ok, State}; %% PUBLISH: This is only to register packetId to session state. %% The actual message dispatching should be done by the caller (e.g. connection) process. handle_call({register_publish_packet_id, PacketId, Ts}, _From, State = #state{awaiting_rel = AwaitingRel}) -> - reply(case is_awaiting_full(State) of - false -> - case maps:is_key(PacketId, AwaitingRel) of - true -> - {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; - false -> - State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)}, - {ok, ensure_await_rel_timer(State1)} - end; - true -> - emqx_metrics:trans(inc, 'messages/qos2/dropped'), - ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), - {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} - end); + reply( + case is_awaiting_full(State) of + false -> + case maps:is_key(PacketId, AwaitingRel) of + true -> + {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; + false -> + State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)}, + {ok, ensure_await_rel_timer(State1)} + end; + true -> + ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State), + emqx_metrics:trans(inc, 'messages/qos2/dropped'), + {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} + end); %% PUBREC: handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) -> - reply(case emqx_inflight:contain(PacketId, Inflight) of - true -> - {ok, acked(pubrec, PacketId, State)}; - false -> - emqx_metrics:trans(inc, 'packets/pubrec/missed'), - ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), - {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} - end); + reply( + case emqx_inflight:contain(PacketId, Inflight) of + true -> + {ok, acked(pubrec, PacketId, State)}; + false -> + ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), + emqx_metrics:trans(inc, 'packets/pubrec/missed'), + {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} + end); %% PUBREL: handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> - reply(case maps:take(PacketId, AwaitingRel) of - {_Ts, AwaitingRel1} -> - {ok, State#state{awaiting_rel = AwaitingRel1}}; - error -> - emqx_metrics:trans(inc, 'packets/pubrel/missed'), - ?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State), - {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} - end); + reply( + case maps:take(PacketId, AwaitingRel) of + {_Ts, AwaitingRel1} -> + {ok, State#state{awaiting_rel = AwaitingRel1}}; + error -> + ?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State), + emqx_metrics:trans(inc, 'packets/pubrel/missed'), + {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} + end); handle_call(close, _From, State) -> {stop, normal, ok, State}; @@ -494,25 +498,27 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, %% PUBACK: handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - noreply(dequeue(acked(puback, PacketId, State))); - false -> - ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), - emqx_metrics:trans(inc, 'packets/puback/missed'), - {noreply, State} - end; + noreply( + case emqx_inflight:contain(PacketId, Inflight) of + true -> + dequeue(acked(puback, PacketId, State)); + false -> + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), + emqx_metrics:trans(inc, 'packets/puback/missed'), + State + end); %% PUBCOMP: handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - noreply(dequeue(acked(pubcomp, PacketId, State))); - false -> - ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), - emqx_metrics:trans(inc, 'packets/pubcomp/missed'), - {noreply, State} - end; + noreply( + case emqx_inflight:contain(PacketId, Inflight) of + true -> + dequeue(acked(pubcomp, PacketId, State)); + false -> + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), + emqx_metrics:trans(inc, 'packets/pubcomp/missed'), + State + end); %% RESUME: handle_cast({resume, #{conn_pid := ConnPid, @@ -561,7 +567,7 @@ handle_cast({resume, #{conn_pid := ConnPid, %% Replay delivery and Dequeue pending messages noreply(dequeue(retry_delivery(true, State1))); -handle_cast({expiry_interval, Interval}, State) -> +handle_cast({update_expiry_interval, Interval}, State) -> {noreply, State#state{expiry_interval = Interval}}; handle_cast(Msg, State) ->