diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ab9096f23..7d4362465 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -62,6 +62,9 @@ -import(emqx_zone, [get_env/2, get_env/3]). -record(state, { + %% Idle timeout + idle_timeout :: pos_integer(), + %% Clean Start Flag clean_start = false :: boolean(), @@ -134,10 +137,10 @@ %% Stats timer stats_timer :: reference() | undefined, - %% TODO: + %% Deliver stats deliver_stats = 0, - %% TODO: + %% Enqueue stats enqueue_stats = 0, %% Created at @@ -150,11 +153,10 @@ emqx_logger:Level([{client, State#state.client_id}], "Session(~s): " ++ Format, [State#state.client_id | Args])). -%% @doc Start a session --spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}). +%% @doc Start a session proc. +-spec(start_link(SessAttrs :: map()) -> {ok, pid()}). start_link(SessAttrs) -> - IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000), - gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]). + proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]). %% @doc Get session info -spec(info(pid() | #state{}) -> list({atom(), term()})). @@ -309,16 +311,18 @@ close(SPid) -> %% gen_server callbacks %%------------------------------------------------------------------------------ -init(#{zone := Zone, - client_id := ClientId, - username := Username, - conn_pid := ConnPid, - clean_start := CleanStart, - conn_props := ConnProps}) -> +init([Parent, #{zone := Zone, + client_id := ClientId, + username := Username, + conn_pid := ConnPid, + clean_start := CleanStart, + conn_props := ConnProps}]) -> process_flag(trap_exit, true), true = link(ConnPid), MaxInflight = get_env(Zone, max_inflight), - State = #state{clean_start = CleanStart, + IdleTimout = get_env(Zone, idle_timeout, 30000), + State = #state{idle_timeout = IdleTimout, + clean_start = CleanStart, binding = binding(ConnPid), client_id = ClientId, username = Username, @@ -327,7 +331,7 @@ init(#{zone := Zone, max_subscriptions = get_env(Zone, max_subscriptions, 0), upgrade_qos = get_env(Zone, upgrade_qos, false), inflight = emqx_inflight:new(MaxInflight), - mqueue = init_mqueue(Zone, ClientId), + mqueue = init_mqueue(Zone), retry_interval = get_env(Zone, retry_interval, 0), awaiting_rel = #{}, await_rel_timeout = get_env(Zone, await_rel_timeout), @@ -337,20 +341,23 @@ init(#{zone := Zone, deliver_stats = 0, enqueue_stats = 0, created_at = os:timestamp()}, - emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]), + emqx_sm:register_session(ClientId, attrs(State)), emqx_sm:set_session_stats(ClientId, stats(State)), emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]), - {ok, State}. + ok = proc_lib:init_ack(Parent, {ok, self()}), + gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) -> I * 1000; expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1 get_env(Zone, session_expiry_interval, 0). -init_mqueue(Zone, ClientId) -> - emqx_mqueue:new(ClientId, #{type => simple, - max_len => get_env(Zone, max_mqueue_len), - store_qos0 => get_env(Zone, mqueue_store_qos0)}). +init_mqueue(Zone) -> + emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple), + max_len => get_env(Zone, max_mqueue_len, 1000), + priorities => get_env(Zone, mqueue_priorities, ""), + store_qos0 => get_env(Zone, mqueue_store_qos0, true) + }). binding(ConnPid) -> case node(ConnPid) =:= node() of true -> local; false -> remote end. @@ -366,43 +373,43 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> %% PUBLISH: handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, State = #state{awaiting_rel = AwaitingRel}) -> - case is_awaiting_full(State) of - false -> - case maps:is_key(PacketId, AwaitingRel) of - true -> - reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State); - false -> - State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, - reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1)) - end; - true -> - ?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State), - emqx_metrics:inc('messages/qos2/dropped'), - reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State) - end; + reply(case is_awaiting_full(State) of + false -> + case maps:is_key(PacketId, AwaitingRel) of + true -> + {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; + false -> + State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, + {emqx_broker:publish(Msg), ensure_await_rel_timer(State1)} + end; + true -> + emqx_metrics:inc('messages/qos2/dropped'), + ?LOG(warning, "Dropped message for too many awaiting_rel: ~p", + [emqx_message:format(Msg)], State), + {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} + end); %% PUBREC: handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) -> - case emqx_inflight:contain(PacketId, Inflight) of - true -> - reply(ok, acked(pubrec, PacketId, State)); - false -> - ?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State), - emqx_metrics:inc('packets/pubrec/missed'), - reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) - end; + reply(case emqx_inflight:contain(PacketId, Inflight) of + true -> + {ok, acked(pubrec, PacketId, State)}; + false -> + emqx_metrics:inc('packets/pubrec/missed'), + ?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State), + {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} + end); %% PUBREL: -handle_call({pubrel, PacketId, _ReasonCode}, _From, - State = #state{awaiting_rel = AwaitingRel}) -> - case maps:take(PacketId, AwaitingRel) of - {_, AwaitingRel1} -> - reply(ok, State#state{awaiting_rel = AwaitingRel1}); - error -> - ?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State), - emqx_metrics:inc('packets/pubrel/missed'), - reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State) - end; +handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> + reply(case maps:take(PacketId, AwaitingRel) of + {_, AwaitingRel1} -> + {ok, State#state{awaiting_rel = AwaitingRel1}}; + error -> + emqx_metrics:inc('packets/pubrel/missed'), + ?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State), + {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} + end); handle_call(info, _From, State) -> reply(info(State), State); @@ -439,7 +446,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, end} end, {[], Subscriptions}, TopicFilters), suback(FromPid, PacketId, ReasonCodes), - {noreply, State#state{subscriptions = Subscriptions1}}; + noreply(State#state{subscriptions = Subscriptions1}); %% UNSUBSCRIBE: handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, @@ -456,15 +463,15 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, end end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), - {noreply, State#state{subscriptions = Subscriptions1}}; + noreply(State#state{subscriptions = Subscriptions1}); %% PUBACK: handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> - {noreply, dequeue(acked(puback, PacketId, State))}; + noreply(dequeue(acked(puback, PacketId, State))); false -> - ?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State), + ?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State), emqx_metrics:inc('packets/puback/missed'), {noreply, State} end; @@ -473,9 +480,9 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight} handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) -> case emqx_inflight:contain(PacketId, Inflight) of true -> - {noreply, dequeue(acked(pubcomp, PacketId, State))}; + noreply(dequeue(acked(pubcomp, PacketId, State))); false -> - ?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State), + ?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State), emqx_metrics:inc('packets/pubcomp/missed'), {noreply, State} end; @@ -494,7 +501,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]), case kick(ClientId, OldConnPid, ConnPid) of - ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State); + ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State); ignore -> ok end, @@ -509,13 +516,13 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId, await_rel_timer = undefined, expiry_timer = undefined}, - %% Clean Session: true -> false? - CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)), + %% Clean Session: true -> false??? + CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)), emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]), %% Replay delivery and Dequeue pending messages - {noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))}; + noreply(dequeue(retry_delivery(true, State1))); handle_cast(Msg, State) -> emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), @@ -524,63 +531,68 @@ handle_cast(Msg, State) -> %% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> {noreply, lists:foldl(fun(Msg, NewState) -> - element(2, handle_info({dispatch, Topic, Msg}, NewState)) + element(2, handle_info({dispatch, Topic, Msg}, NewState)) end, State, Msgs)}; %% Dispatch message handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) -> - {noreply, case maps:find(Topic, SubMap) of - {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); - {ok, #{nl := Nl, qos := QoS}} -> - run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); - error -> - dispatch(reset_dup(Msg), State) - end}; + noreply(case maps:find(Topic, SubMap) of + {ok, #{nl := Nl, qos := QoS, subid := SubId}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State); + {ok, #{nl := Nl, qos := QoS}} -> + run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State); + error -> + dispatch(emqx_message:unset_flag(dup, Msg), State) + end); %% Do nothing if the client has been disconnected. -handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) -> - {noreply, ensure_stats_timer(State#state{retry_timer = undefined})}; +handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) -> + noreply(State#state{retry_timer = undefined}); -handle_info({timeout, _Timer, retry_delivery}, State) -> - {noreply, ensure_stats_timer(retry_delivery(false, State#state{retry_timer = undefined}))}; +handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer}) -> + noreply(retry_delivery(false, State#state{retry_timer = undefined})); -handle_info({timeout, _Timer, check_awaiting_rel}, State) -> - {noreply, ensure_stats_timer(expire_awaiting_rel(State#state{await_rel_timer = undefined}))}; +handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) -> + noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); -handle_info({timeout, _Timer, expired}, State) -> - ?LOG(info, "Expired, shutdown now.", [], State), +handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> + true = emqx_sm:set_session_stats(ClientId, stats(State)), + {noreply, State#state{stats_timer = undefined}, hibernate}; + +handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> + ?LOG(info, "expired, shutdown now:(", [], State), shutdown(expired, State); handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) -> - {stop, Reason, State}; + {stop, Reason, State#state{conn_pid = undefined}}; handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) -> - {stop, Reason, State}; + {stop, Reason, State#state{conn_pid = undefined}}; handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) -> {noreply, ensure_expire_timer(State#state{conn_pid = undefined})}; handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) -> %% ignore - {noreply, State#state{old_conn_pid = undefined}, hibernate}; + {noreply, State#state{old_conn_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) -> - ?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", + ?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p", [ConnPid, Pid, Reason], State), {noreply, State}; -handle_info(emit_stats, State = #state{client_id = ClientId}) -> - emqx_sm:set_session_stats(ClientId, stats(State)), - {noreply, State#state{stats_timer = undefined}, hibernate}; - handle_info(Info, State) -> emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. -terminate(Reason, #state{client_id = ClientId}) -> +terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) -> emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]), - %%TODO: notify conn_pid to shutdown? + %% Ensure to shutdown the connection + if + ConnPid =/= undefined -> + ConnPid ! {shutdown, Reason}; + true -> ok + end, emqx_sm:unregister_session(ClientId). code_change(_OldVsn, State, _Extra) -> @@ -611,7 +623,7 @@ kick(ClientId, OldPid, Pid) -> unlink(OldPid), OldPid ! {shutdown, conflict, {ClientId, Pid}}, %% Clean noproc - receive {'EXIT', OldPid, _} -> ok after 0 -> ok end. + receive {'EXIT', OldPid, _} -> ok after 1 -> ok end. %%------------------------------------------------------------------------------ %% Replay or Retry Delivery @@ -622,30 +634,37 @@ retry_delivery(Force, State = #state{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> State; false -> - Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), - retry_delivery(Force, Msgs, os:timestamp(), State) + InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), + retry_delivery(Force, InflightMsgs, os:timestamp(), State) end. -retry_delivery(_Force, [], _Now, State = #state{retry_interval = Interval}) -> - State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)}; +retry_delivery(_Force, [], _Now, State) -> + %% Retry again... + ensure_retry_timer(State); retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, State = #state{inflight = Inflight, retry_interval = Interval}) -> - Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms + %% Microseconds -> MilliSeconds + Diff = timer:now_diff(Now, Ts) div 1000, if Force orelse (Diff >= Interval) -> - case {Type, Msg0} of - {publish, {PacketId, Msg}} -> - redeliver({PacketId, Msg}, State), - Inflight1 = emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight), - retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); - {pubrel, PacketId} -> - redeliver({pubrel, PacketId}, State), - Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight), - retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}) - end; + Inflight1 = case {Type, Msg0} of + {publish, {PacketId, Msg}} -> + case emqx_message:is_expired(Msg) of + true -> + emqx_metrics:inc('messages/expired'), + emqx_inflight:delete(PacketId, Inflight); + false -> + redeliver({PacketId, Msg}, State), + emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight) + end; + {pubrel, PacketId} -> + redeliver({pubrel, PacketId}, State), + emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight) + end, + retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); true -> - State#state{retry_timer = emqx_misc:start_timer(Interval - Diff, retry_delivery)} + ensure_retry_timer(Interval - Diff, State) end. %%------------------------------------------------------------------------------ @@ -662,16 +681,16 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], - Now, State = #state{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> +expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, + State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, TS) div 1000) of Diff when Diff >= Timeout -> - ?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State), emqx_metrics:inc('messages/qos2/dropped'), + ?LOG(warning, "Dropped message for await_rel_timeout: ~p", + [emqx_message:format(Msg)], State), expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Diff -> - State#state{await_rel_timer = emqx_misc:start_timer(Timeout - Diff, check_awaiting_rel)} + ensure_await_rel_timer(Timeout - Diff, State) end. %%------------------------------------------------------------------------------ @@ -728,12 +747,10 @@ dispatch(Msg = #message{qos = ?QOS0}, State) -> dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of - true -> - enqueue_msg(Msg, State); + true -> enqueue_msg(Msg, State); false -> deliver(PacketId, Msg, State), - %% TODO inc_stats?? - await(PacketId, Msg, next_pkt_id(inc_stats(deliver, State))) + await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State))) end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> @@ -760,15 +777,10 @@ deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) -> %% Awaiting ACK for QoS1/QoS2 Messages %%------------------------------------------------------------------------------ -await(PacketId, Msg, State = #state{inflight = Inflight, - retry_timer = RetryTimer, - retry_interval = Interval}) -> - %% Start retry timer if the Inflight is still empty - State1 = case RetryTimer == undefined of - true -> State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)}; - false -> State - end, - State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}. +await(PacketId, Msg, State = #state{inflight = Inflight}) -> + Inflight1 = emqx_inflight:insert( + PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight), + ensure_retry_timer(State#state{inflight = Inflight1}). acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of @@ -776,7 +788,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Infligh emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg), State#state{inflight = emqx_inflight:delete(PacketId, Inflight)}; none -> - ?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State), + ?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State), State end; @@ -786,10 +798,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Infligh emqx_hooks:run('message.acked', [ClientId], Msg), State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)}; {value, {pubrel, PacketId, _Ts}} -> - ?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State), + ?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State), State; none -> - ?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId], State), + ?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId], State), State end; @@ -819,28 +831,42 @@ dequeue2(State = #state{mqueue = Q}) -> dequeue(dispatch(Msg, State#state{mqueue = Q1})) end. - %%------------------------------------------------------------------------------ %% Ensure timers ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> - State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; + ensure_await_rel_timer(Timeout, State); ensure_await_rel_timer(State) -> State. +ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) -> + State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; +ensure_await_rel_timer(_Timeout, State) -> + State. + +ensure_retry_timer(State = #state{retry_timer = undefined, retry_interval = Interval}) -> + ensure_retry_timer(Interval, State); +ensure_retry_timer(State) -> + State. + +ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) -> + State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)}; +ensure_retry_timer(_Timeout, State) -> + State. + ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 -> State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)}; ensure_expire_timer(State) -> State. -%%------------------------------------------------------------------------------ -%% Reset Dup - -reset_dup(Msg) -> - emqx_message:unset_flag(dup, Msg). +ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined, + idle_timeout = IdleTimeout}) -> + State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)}; +ensure_stats_timer(State) -> + State. %%------------------------------------------------------------------------------ -%% Next Msg Id +%% Next Packet Id next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) -> State#state{next_pkt_id = 1}; @@ -849,26 +875,28 @@ next_pkt_id(State = #state{next_pkt_id = Id}) -> State#state{next_pkt_id = Id + 1}. %%------------------------------------------------------------------------------ -%% Ensure stats timer - -ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) -> - State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)}; -ensure_stats_timer(State) -> - State. +%% Inc stats inc_stats(deliver, State = #state{deliver_stats = I}) -> State#state{deliver_stats = I + 1}; inc_stats(enqueue, State = #state{enqueue_stats = I}) -> State#state{enqueue_stats = I + 1}. -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% Helper functions +reply({Reply, State}) -> + reply(Reply, State). + reply(Reply, State) -> - {reply, Reply, State}. + {reply, Reply, ensure_stats_timer(State)}. + +noreply(State) -> + {noreply, ensure_stats_timer(State)}. shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. -%% TODO: maybe_gc(State) -> State. +%% TODO: GC Policy and Shutdown Policy +%% maybe_gc(State) -> State.