Support message ttl and expiration
This commit is contained in:
parent
1f2bbe3eb8
commit
7b5f2577d3
|
@ -62,6 +62,9 @@
|
|||
-import(emqx_zone, [get_env/2, get_env/3]).
|
||||
|
||||
-record(state, {
|
||||
%% Idle timeout
|
||||
idle_timeout :: pos_integer(),
|
||||
|
||||
%% Clean Start Flag
|
||||
clean_start = false :: boolean(),
|
||||
|
||||
|
@ -134,10 +137,10 @@
|
|||
%% Stats timer
|
||||
stats_timer :: reference() | undefined,
|
||||
|
||||
%% TODO:
|
||||
%% Deliver stats
|
||||
deliver_stats = 0,
|
||||
|
||||
%% TODO:
|
||||
%% Enqueue stats
|
||||
enqueue_stats = 0,
|
||||
|
||||
%% Created at
|
||||
|
@ -150,11 +153,10 @@
|
|||
emqx_logger:Level([{client, State#state.client_id}],
|
||||
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
||||
|
||||
%% @doc Start a session
|
||||
-spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}).
|
||||
%% @doc Start a session proc.
|
||||
-spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
|
||||
start_link(SessAttrs) ->
|
||||
IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000),
|
||||
gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]).
|
||||
proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]).
|
||||
|
||||
%% @doc Get session info
|
||||
-spec(info(pid() | #state{}) -> list({atom(), term()})).
|
||||
|
@ -309,16 +311,18 @@ close(SPid) ->
|
|||
%% gen_server callbacks
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
init(#{zone := Zone,
|
||||
client_id := ClientId,
|
||||
username := Username,
|
||||
conn_pid := ConnPid,
|
||||
clean_start := CleanStart,
|
||||
conn_props := ConnProps}) ->
|
||||
init([Parent, #{zone := Zone,
|
||||
client_id := ClientId,
|
||||
username := Username,
|
||||
conn_pid := ConnPid,
|
||||
clean_start := CleanStart,
|
||||
conn_props := ConnProps}]) ->
|
||||
process_flag(trap_exit, true),
|
||||
true = link(ConnPid),
|
||||
MaxInflight = get_env(Zone, max_inflight),
|
||||
State = #state{clean_start = CleanStart,
|
||||
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||
State = #state{idle_timeout = IdleTimout,
|
||||
clean_start = CleanStart,
|
||||
binding = binding(ConnPid),
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
|
@ -327,7 +331,7 @@ init(#{zone := Zone,
|
|||
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||
inflight = emqx_inflight:new(MaxInflight),
|
||||
mqueue = init_mqueue(Zone, ClientId),
|
||||
mqueue = init_mqueue(Zone),
|
||||
retry_interval = get_env(Zone, retry_interval, 0),
|
||||
awaiting_rel = #{},
|
||||
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
||||
|
@ -337,20 +341,23 @@ init(#{zone := Zone,
|
|||
deliver_stats = 0,
|
||||
enqueue_stats = 0,
|
||||
created_at = os:timestamp()},
|
||||
emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]),
|
||||
emqx_sm:register_session(ClientId, attrs(State)),
|
||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
|
||||
{ok, State}.
|
||||
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
||||
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
||||
|
||||
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
|
||||
I * 1000;
|
||||
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
|
||||
get_env(Zone, session_expiry_interval, 0).
|
||||
|
||||
init_mqueue(Zone, ClientId) ->
|
||||
emqx_mqueue:new(ClientId, #{type => simple,
|
||||
max_len => get_env(Zone, max_mqueue_len),
|
||||
store_qos0 => get_env(Zone, mqueue_store_qos0)}).
|
||||
init_mqueue(Zone) ->
|
||||
emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
|
||||
max_len => get_env(Zone, max_mqueue_len, 1000),
|
||||
priorities => get_env(Zone, mqueue_priorities, ""),
|
||||
store_qos0 => get_env(Zone, mqueue_store_qos0, true)
|
||||
}).
|
||||
|
||||
binding(ConnPid) ->
|
||||
case node(ConnPid) =:= node() of true -> local; false -> remote end.
|
||||
|
@ -366,43 +373,43 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
|
|||
%% PUBLISH:
|
||||
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
|
||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
||||
case is_awaiting_full(State) of
|
||||
false ->
|
||||
case maps:is_key(PacketId, AwaitingRel) of
|
||||
true ->
|
||||
reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State);
|
||||
false ->
|
||||
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
|
||||
reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1))
|
||||
end;
|
||||
true ->
|
||||
?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State),
|
||||
emqx_metrics:inc('messages/qos2/dropped'),
|
||||
reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State)
|
||||
end;
|
||||
reply(case is_awaiting_full(State) of
|
||||
false ->
|
||||
case maps:is_key(PacketId, AwaitingRel) of
|
||||
true ->
|
||||
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
||||
false ->
|
||||
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
|
||||
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
|
||||
end;
|
||||
true ->
|
||||
emqx_metrics:inc('messages/qos2/dropped'),
|
||||
?LOG(warning, "Dropped message for too many awaiting_rel: ~p",
|
||||
[emqx_message:format(Msg)], State),
|
||||
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
||||
end);
|
||||
|
||||
%% PUBREC:
|
||||
handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
|
||||
case emqx_inflight:contain(PacketId, Inflight) of
|
||||
true ->
|
||||
reply(ok, acked(pubrec, PacketId, State));
|
||||
false ->
|
||||
?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
|
||||
emqx_metrics:inc('packets/pubrec/missed'),
|
||||
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
|
||||
end;
|
||||
reply(case emqx_inflight:contain(PacketId, Inflight) of
|
||||
true ->
|
||||
{ok, acked(pubrec, PacketId, State)};
|
||||
false ->
|
||||
emqx_metrics:inc('packets/pubrec/missed'),
|
||||
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
||||
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||
end);
|
||||
|
||||
%% PUBREL:
|
||||
handle_call({pubrel, PacketId, _ReasonCode}, _From,
|
||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
||||
case maps:take(PacketId, AwaitingRel) of
|
||||
{_, AwaitingRel1} ->
|
||||
reply(ok, State#state{awaiting_rel = AwaitingRel1});
|
||||
error ->
|
||||
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
|
||||
emqx_metrics:inc('packets/pubrel/missed'),
|
||||
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
|
||||
end;
|
||||
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
|
||||
reply(case maps:take(PacketId, AwaitingRel) of
|
||||
{_, AwaitingRel1} ->
|
||||
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
||||
error ->
|
||||
emqx_metrics:inc('packets/pubrel/missed'),
|
||||
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
|
||||
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||
end);
|
||||
|
||||
handle_call(info, _From, State) ->
|
||||
reply(info(State), State);
|
||||
|
@ -439,7 +446,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
|||
end}
|
||||
end, {[], Subscriptions}, TopicFilters),
|
||||
suback(FromPid, PacketId, ReasonCodes),
|
||||
{noreply, State#state{subscriptions = Subscriptions1}};
|
||||
noreply(State#state{subscriptions = Subscriptions1});
|
||||
|
||||
%% UNSUBSCRIBE:
|
||||
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||
|
@ -456,15 +463,15 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
|||
end
|
||||
end, {[], Subscriptions}, TopicFilters),
|
||||
unsuback(From, PacketId, ReasonCodes),
|
||||
{noreply, State#state{subscriptions = Subscriptions1}};
|
||||
noreply(State#state{subscriptions = Subscriptions1});
|
||||
|
||||
%% PUBACK:
|
||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||
case emqx_inflight:contain(PacketId, Inflight) of
|
||||
true ->
|
||||
{noreply, dequeue(acked(puback, PacketId, State))};
|
||||
noreply(dequeue(acked(puback, PacketId, State)));
|
||||
false ->
|
||||
?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State),
|
||||
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
||||
emqx_metrics:inc('packets/puback/missed'),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
@ -473,9 +480,9 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
|
|||
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||
case emqx_inflight:contain(PacketId, Inflight) of
|
||||
true ->
|
||||
{noreply, dequeue(acked(pubcomp, PacketId, State))};
|
||||
noreply(dequeue(acked(pubcomp, PacketId, State)));
|
||||
false ->
|
||||
?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State),
|
||||
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
||||
emqx_metrics:inc('packets/pubcomp/missed'),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
@ -494,7 +501,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
|||
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
|
||||
|
||||
case kick(ClientId, OldConnPid, ConnPid) of
|
||||
ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||
ignore -> ok
|
||||
end,
|
||||
|
||||
|
@ -509,13 +516,13 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
|||
await_rel_timer = undefined,
|
||||
expiry_timer = undefined},
|
||||
|
||||
%% Clean Session: true -> false?
|
||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
|
||||
%% Clean Session: true -> false???
|
||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||
|
||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
||||
|
||||
%% Replay delivery and Dequeue pending messages
|
||||
{noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
|
||||
noreply(dequeue(retry_delivery(true, State1)));
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
||||
|
@ -524,63 +531,68 @@ handle_cast(Msg, State) ->
|
|||
%% Batch dispatch
|
||||
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
||||
{noreply, lists:foldl(fun(Msg, NewState) ->
|
||||
element(2, handle_info({dispatch, Topic, Msg}, NewState))
|
||||
element(2, handle_info({dispatch, Topic, Msg}, NewState))
|
||||
end, State, Msgs)};
|
||||
|
||||
%% Dispatch message
|
||||
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
|
||||
{noreply, case maps:find(Topic, SubMap) of
|
||||
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
|
||||
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
|
||||
{ok, #{nl := Nl, qos := QoS}} ->
|
||||
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
|
||||
error ->
|
||||
dispatch(reset_dup(Msg), State)
|
||||
end};
|
||||
noreply(case maps:find(Topic, SubMap) of
|
||||
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
|
||||
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
|
||||
{ok, #{nl := Nl, qos := QoS}} ->
|
||||
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
|
||||
error ->
|
||||
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||
end);
|
||||
|
||||
%% Do nothing if the client has been disconnected.
|
||||
handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) ->
|
||||
{noreply, ensure_stats_timer(State#state{retry_timer = undefined})};
|
||||
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
||||
noreply(State#state{retry_timer = undefined});
|
||||
|
||||
handle_info({timeout, _Timer, retry_delivery}, State) ->
|
||||
{noreply, ensure_stats_timer(retry_delivery(false, State#state{retry_timer = undefined}))};
|
||||
handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer}) ->
|
||||
noreply(retry_delivery(false, State#state{retry_timer = undefined}));
|
||||
|
||||
handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
|
||||
{noreply, ensure_stats_timer(expire_awaiting_rel(State#state{await_rel_timer = undefined}))};
|
||||
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
||||
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
|
||||
|
||||
handle_info({timeout, _Timer, expired}, State) ->
|
||||
?LOG(info, "Expired, shutdown now.", [], State),
|
||||
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
|
||||
true = emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||
|
||||
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||
?LOG(info, "expired, shutdown now:(", [], State),
|
||||
shutdown(expired, State);
|
||||
|
||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
|
||||
{stop, Reason, State};
|
||||
{stop, Reason, State#state{conn_pid = undefined}};
|
||||
|
||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
|
||||
{stop, Reason, State};
|
||||
{stop, Reason, State#state{conn_pid = undefined}};
|
||||
|
||||
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
|
||||
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
|
||||
|
||||
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
||||
%% ignore
|
||||
{noreply, State#state{old_conn_pid = undefined}, hibernate};
|
||||
{noreply, State#state{old_conn_pid = undefined}};
|
||||
|
||||
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||
?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
||||
?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
||||
[ConnPid, Pid, Reason], State),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(emit_stats, State = #state{client_id = ClientId}) ->
|
||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(Reason, #state{client_id = ClientId}) ->
|
||||
terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
||||
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
||||
%%TODO: notify conn_pid to shutdown?
|
||||
%% Ensure to shutdown the connection
|
||||
if
|
||||
ConnPid =/= undefined ->
|
||||
ConnPid ! {shutdown, Reason};
|
||||
true -> ok
|
||||
end,
|
||||
emqx_sm:unregister_session(ClientId).
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -611,7 +623,7 @@ kick(ClientId, OldPid, Pid) ->
|
|||
unlink(OldPid),
|
||||
OldPid ! {shutdown, conflict, {ClientId, Pid}},
|
||||
%% Clean noproc
|
||||
receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
|
||||
receive {'EXIT', OldPid, _} -> ok after 1 -> ok end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Replay or Retry Delivery
|
||||
|
@ -622,30 +634,37 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
|
|||
case emqx_inflight:is_empty(Inflight) of
|
||||
true -> State;
|
||||
false ->
|
||||
Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
||||
retry_delivery(Force, Msgs, os:timestamp(), State)
|
||||
InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
||||
retry_delivery(Force, InflightMsgs, os:timestamp(), State)
|
||||
end.
|
||||
|
||||
retry_delivery(_Force, [], _Now, State = #state{retry_interval = Interval}) ->
|
||||
State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
||||
retry_delivery(_Force, [], _Now, State) ->
|
||||
%% Retry again...
|
||||
ensure_retry_timer(State);
|
||||
|
||||
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
||||
Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms
|
||||
%% Microseconds -> MilliSeconds
|
||||
Diff = timer:now_diff(Now, Ts) div 1000,
|
||||
if
|
||||
Force orelse (Diff >= Interval) ->
|
||||
case {Type, Msg0} of
|
||||
{publish, {PacketId, Msg}} ->
|
||||
redeliver({PacketId, Msg}, State),
|
||||
Inflight1 = emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight),
|
||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
||||
{pubrel, PacketId} ->
|
||||
redeliver({pubrel, PacketId}, State),
|
||||
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
|
||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
|
||||
end;
|
||||
Inflight1 = case {Type, Msg0} of
|
||||
{publish, {PacketId, Msg}} ->
|
||||
case emqx_message:is_expired(Msg) of
|
||||
true ->
|
||||
emqx_metrics:inc('messages/expired'),
|
||||
emqx_inflight:delete(PacketId, Inflight);
|
||||
false ->
|
||||
redeliver({PacketId, Msg}, State),
|
||||
emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight)
|
||||
end;
|
||||
{pubrel, PacketId} ->
|
||||
redeliver({pubrel, PacketId}, State),
|
||||
emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight)
|
||||
end,
|
||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
||||
true ->
|
||||
State#state{retry_timer = emqx_misc:start_timer(Interval - Diff, retry_delivery)}
|
||||
ensure_retry_timer(Interval - Diff, State)
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -662,16 +681,16 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
|
|||
expire_awaiting_rel([], _Now, State) ->
|
||||
State#state{await_rel_timer = undefined};
|
||||
|
||||
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs],
|
||||
Now, State = #state{awaiting_rel = AwaitingRel,
|
||||
await_rel_timeout = Timeout}) ->
|
||||
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now,
|
||||
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||
case (timer:now_diff(Now, TS) div 1000) of
|
||||
Diff when Diff >= Timeout ->
|
||||
?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
|
||||
emqx_metrics:inc('messages/qos2/dropped'),
|
||||
?LOG(warning, "Dropped message for await_rel_timeout: ~p",
|
||||
[emqx_message:format(Msg)], State),
|
||||
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
||||
Diff ->
|
||||
State#state{await_rel_timer = emqx_misc:start_timer(Timeout - Diff, check_awaiting_rel)}
|
||||
ensure_await_rel_timer(Timeout - Diff, State)
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -728,12 +747,10 @@ dispatch(Msg = #message{qos = ?QOS0}, State) ->
|
|||
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
|
||||
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
||||
case emqx_inflight:is_full(Inflight) of
|
||||
true ->
|
||||
enqueue_msg(Msg, State);
|
||||
true -> enqueue_msg(Msg, State);
|
||||
false ->
|
||||
deliver(PacketId, Msg, State),
|
||||
%% TODO inc_stats??
|
||||
await(PacketId, Msg, next_pkt_id(inc_stats(deliver, State)))
|
||||
await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State)))
|
||||
end.
|
||||
|
||||
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||
|
@ -760,15 +777,10 @@ deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) ->
|
|||
%% Awaiting ACK for QoS1/QoS2 Messages
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
await(PacketId, Msg, State = #state{inflight = Inflight,
|
||||
retry_timer = RetryTimer,
|
||||
retry_interval = Interval}) ->
|
||||
%% Start retry timer if the Inflight is still empty
|
||||
State1 = case RetryTimer == undefined of
|
||||
true -> State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
||||
false -> State
|
||||
end,
|
||||
State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}.
|
||||
await(PacketId, Msg, State = #state{inflight = Inflight}) ->
|
||||
Inflight1 = emqx_inflight:insert(
|
||||
PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight),
|
||||
ensure_retry_timer(State#state{inflight = Inflight1}).
|
||||
|
||||
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||
|
@ -776,7 +788,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Infligh
|
|||
emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
|
||||
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
||||
none ->
|
||||
?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
|
||||
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
|
||||
State
|
||||
end;
|
||||
|
||||
|
@ -786,10 +798,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Infligh
|
|||
emqx_hooks:run('message.acked', [ClientId], Msg),
|
||||
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
||||
{value, {pubrel, PacketId, _Ts}} ->
|
||||
?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
|
||||
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),
|
||||
State;
|
||||
none ->
|
||||
?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId], State),
|
||||
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId], State),
|
||||
State
|
||||
end;
|
||||
|
||||
|
@ -819,28 +831,42 @@ dequeue2(State = #state{mqueue = Q}) ->
|
|||
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
|
||||
end.
|
||||
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Ensure timers
|
||||
|
||||
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
|
||||
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
|
||||
ensure_await_rel_timer(Timeout, State);
|
||||
ensure_await_rel_timer(State) ->
|
||||
State.
|
||||
|
||||
ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) ->
|
||||
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
|
||||
ensure_await_rel_timer(_Timeout, State) ->
|
||||
State.
|
||||
|
||||
ensure_retry_timer(State = #state{retry_timer = undefined, retry_interval = Interval}) ->
|
||||
ensure_retry_timer(Interval, State);
|
||||
ensure_retry_timer(State) ->
|
||||
State.
|
||||
|
||||
ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) ->
|
||||
State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
||||
ensure_retry_timer(_Timeout, State) ->
|
||||
State.
|
||||
|
||||
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
|
||||
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
|
||||
ensure_expire_timer(State) ->
|
||||
State.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Reset Dup
|
||||
|
||||
reset_dup(Msg) ->
|
||||
emqx_message:unset_flag(dup, Msg).
|
||||
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
|
||||
idle_timeout = IdleTimeout}) ->
|
||||
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||
ensure_stats_timer(State) ->
|
||||
State.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Next Msg Id
|
||||
%% Next Packet Id
|
||||
|
||||
next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) ->
|
||||
State#state{next_pkt_id = 1};
|
||||
|
@ -849,26 +875,28 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
|
|||
State#state{next_pkt_id = Id + 1}.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Ensure stats timer
|
||||
|
||||
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) ->
|
||||
State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)};
|
||||
ensure_stats_timer(State) ->
|
||||
State.
|
||||
%% Inc stats
|
||||
|
||||
inc_stats(deliver, State = #state{deliver_stats = I}) ->
|
||||
State#state{deliver_stats = I + 1};
|
||||
inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
|
||||
State#state{enqueue_stats = I + 1}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Helper functions
|
||||
|
||||
reply({Reply, State}) ->
|
||||
reply(Reply, State).
|
||||
|
||||
reply(Reply, State) ->
|
||||
{reply, Reply, State}.
|
||||
{reply, Reply, ensure_stats_timer(State)}.
|
||||
|
||||
noreply(State) ->
|
||||
{noreply, ensure_stats_timer(State)}.
|
||||
|
||||
shutdown(Reason, State) ->
|
||||
{stop, {shutdown, Reason}, State}.
|
||||
|
||||
%% TODO: maybe_gc(State) -> State.
|
||||
%% TODO: GC Policy and Shutdown Policy
|
||||
%% maybe_gc(State) -> State.
|
||||
|
||||
|
|
Loading…
Reference in New Issue