|
|
@ -85,7 +85,7 @@
|
|
|
|
|
|
|
|
|
|
|
|
-export([expire/2]).
|
|
|
|
-export([expire/2]).
|
|
|
|
|
|
|
|
|
|
|
|
%% Export for ct
|
|
|
|
%% Export for CT
|
|
|
|
-export([set_field/3]).
|
|
|
|
-export([set_field/3]).
|
|
|
|
|
|
|
|
|
|
|
|
-export_type([session/0]).
|
|
|
|
-export_type([session/0]).
|
|
|
@ -99,7 +99,8 @@
|
|
|
|
max_subscriptions :: non_neg_integer(),
|
|
|
|
max_subscriptions :: non_neg_integer(),
|
|
|
|
%% Upgrade QoS?
|
|
|
|
%% Upgrade QoS?
|
|
|
|
upgrade_qos :: boolean(),
|
|
|
|
upgrade_qos :: boolean(),
|
|
|
|
%% Client <- Broker: QoS1/2 messages sent to the client but unacked.
|
|
|
|
%% Client <- Broker: QoS1/2 messages sent to the client but
|
|
|
|
|
|
|
|
%% have not been unacked.
|
|
|
|
inflight :: emqx_inflight:inflight(),
|
|
|
|
inflight :: emqx_inflight:inflight(),
|
|
|
|
%% All QoS1/2 messages published to when client is disconnected,
|
|
|
|
%% All QoS1/2 messages published to when client is disconnected,
|
|
|
|
%% or QoS1/2 messages pending transmission to the Client.
|
|
|
|
%% or QoS1/2 messages pending transmission to the Client.
|
|
|
@ -108,14 +109,14 @@
|
|
|
|
mqueue :: emqx_mqueue:mqueue(),
|
|
|
|
mqueue :: emqx_mqueue:mqueue(),
|
|
|
|
%% Next packet id of the session
|
|
|
|
%% Next packet id of the session
|
|
|
|
next_pkt_id = 1 :: emqx_types:packet_id(),
|
|
|
|
next_pkt_id = 1 :: emqx_types:packet_id(),
|
|
|
|
%% Retry interval for redelivering QoS1/2 messages (Unit: second)
|
|
|
|
%% Retry interval for redelivering QoS1/2 messages (Unit: millsecond)
|
|
|
|
retry_interval :: timeout(),
|
|
|
|
retry_interval :: timeout(),
|
|
|
|
%% Client -> Broker: QoS2 messages received from client and
|
|
|
|
%% Client -> Broker: QoS2 messages received from the client, but
|
|
|
|
%% waiting for pubrel.
|
|
|
|
%% have not been completely acknowledged
|
|
|
|
awaiting_rel :: map(),
|
|
|
|
awaiting_rel :: map(),
|
|
|
|
%% Max Packets Awaiting PUBREL
|
|
|
|
%% Maximum number of awaiting QoS2 messages allowed
|
|
|
|
max_awaiting_rel :: non_neg_integer(),
|
|
|
|
max_awaiting_rel :: non_neg_integer(),
|
|
|
|
%% Awaiting PUBREL Timeout (Unit: second)
|
|
|
|
%% Awaiting PUBREL Timeout (Unit: millsecond)
|
|
|
|
await_rel_timeout :: timeout(),
|
|
|
|
await_rel_timeout :: timeout(),
|
|
|
|
%% Created at
|
|
|
|
%% Created at
|
|
|
|
created_at :: pos_integer()
|
|
|
|
created_at :: pos_integer()
|
|
|
@ -125,6 +126,10 @@
|
|
|
|
|
|
|
|
|
|
|
|
-type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
|
|
|
|
-type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-type(pubrel() :: {pubrel, emqx_types:packet_id()}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-type(replies() :: list(publish() | pubrel())).
|
|
|
|
|
|
|
|
|
|
|
|
-define(INFO_KEYS, [subscriptions,
|
|
|
|
-define(INFO_KEYS, [subscriptions,
|
|
|
|
upgrade_qos,
|
|
|
|
upgrade_qos,
|
|
|
|
retry_interval,
|
|
|
|
retry_interval,
|
|
|
@ -150,7 +155,6 @@
|
|
|
|
%% Init a Session
|
|
|
|
%% Init a Session
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc Init a session.
|
|
|
|
|
|
|
|
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
|
|
|
|
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
|
|
|
|
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
|
|
|
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
|
|
|
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
|
|
|
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
|
|
@ -159,10 +163,10 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
|
|
|
|
inflight = emqx_inflight:new(MaxInflight),
|
|
|
|
inflight = emqx_inflight:new(MaxInflight),
|
|
|
|
mqueue = init_mqueue(Zone),
|
|
|
|
mqueue = init_mqueue(Zone),
|
|
|
|
next_pkt_id = 1,
|
|
|
|
next_pkt_id = 1,
|
|
|
|
retry_interval = get_env(Zone, retry_interval, 0),
|
|
|
|
retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)),
|
|
|
|
awaiting_rel = #{},
|
|
|
|
awaiting_rel = #{},
|
|
|
|
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
|
|
|
|
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
|
|
|
|
await_rel_timeout = get_env(Zone, await_rel_timeout, 300),
|
|
|
|
await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
|
|
|
|
created_at = erlang:system_time(second)
|
|
|
|
created_at = erlang:system_time(second)
|
|
|
|
}.
|
|
|
|
}.
|
|
|
|
|
|
|
|
|
|
|
@ -174,15 +178,15 @@ init_mqueue(Zone) ->
|
|
|
|
default_priority => get_env(Zone, mqueue_default_priority, lowest)
|
|
|
|
default_priority => get_env(Zone, mqueue_default_priority, lowest)
|
|
|
|
}).
|
|
|
|
}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
%% Info, Stats
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc Get infos of the session.
|
|
|
|
%% @doc Get infos of the session.
|
|
|
|
-spec(info(session()) -> emqx_types:infos()).
|
|
|
|
-spec(info(session()) -> emqx_types:infos()).
|
|
|
|
info(Session) ->
|
|
|
|
info(Session) ->
|
|
|
|
maps:from_list(info(?INFO_KEYS, Session)).
|
|
|
|
maps:from_list(info(?INFO_KEYS, Session)).
|
|
|
|
|
|
|
|
|
|
|
|
%% @doc Get stats of the session.
|
|
|
|
|
|
|
|
-spec(stats(session()) -> emqx_types:stats()).
|
|
|
|
|
|
|
|
stats(Session) -> info(?STATS_KEYS, Session).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
info(Keys, Session) when is_list(Keys) ->
|
|
|
|
info(Keys, Session) when is_list(Keys) ->
|
|
|
|
[{Key, info(Key, Session)} || Key <- Keys];
|
|
|
|
[{Key, info(Key, Session)} || Key <- Keys];
|
|
|
|
info(subscriptions, #session{subscriptions = Subs}) ->
|
|
|
|
info(subscriptions, #session{subscriptions = Subs}) ->
|
|
|
@ -200,7 +204,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
|
|
|
|
info(inflight_max, #session{inflight = Inflight}) ->
|
|
|
|
info(inflight_max, #session{inflight = Inflight}) ->
|
|
|
|
emqx_inflight:max_size(Inflight);
|
|
|
|
emqx_inflight:max_size(Inflight);
|
|
|
|
info(retry_interval, #session{retry_interval = Interval}) ->
|
|
|
|
info(retry_interval, #session{retry_interval = Interval}) ->
|
|
|
|
Interval;
|
|
|
|
Interval div 1000;
|
|
|
|
|
|
|
|
info(mqueue, #session{mqueue = MQueue}) ->
|
|
|
|
|
|
|
|
MQueue;
|
|
|
|
info(mqueue_len, #session{mqueue = MQueue}) ->
|
|
|
|
info(mqueue_len, #session{mqueue = MQueue}) ->
|
|
|
|
emqx_mqueue:len(MQueue);
|
|
|
|
emqx_mqueue:len(MQueue);
|
|
|
|
info(mqueue_max, #session{mqueue = MQueue}) ->
|
|
|
|
info(mqueue_max, #session{mqueue = MQueue}) ->
|
|
|
@ -216,52 +222,20 @@ info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
|
|
|
|
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
|
|
|
|
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
|
|
|
|
Max;
|
|
|
|
Max;
|
|
|
|
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
|
|
|
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
|
|
|
|
Timeout;
|
|
|
|
Timeout div 1000;
|
|
|
|
info(created_at, #session{created_at = CreatedAt}) ->
|
|
|
|
info(created_at, #session{created_at = CreatedAt}) ->
|
|
|
|
CreatedAt.
|
|
|
|
CreatedAt.
|
|
|
|
|
|
|
|
|
|
|
|
%% For tests
|
|
|
|
%% @doc Get stats of the session.
|
|
|
|
set_field(Name, Val, Channel) ->
|
|
|
|
-spec(stats(session()) -> emqx_types:stats()).
|
|
|
|
Fields = record_info(fields, session),
|
|
|
|
stats(Session) -> info(?STATS_KEYS, Session).
|
|
|
|
Pos = emqx_misc:index_of(Name, Fields),
|
|
|
|
|
|
|
|
setelement(Pos+1, Channel, Val).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(takeover(session()) -> ok).
|
|
|
|
|
|
|
|
takeover(#session{subscriptions = Subs}) ->
|
|
|
|
|
|
|
|
lists:foreach(fun({TopicFilter, _SubOpts}) ->
|
|
|
|
|
|
|
|
ok = emqx_broker:unsubscribe(TopicFilter)
|
|
|
|
|
|
|
|
end, maps:to_list(Subs)).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(resume(emqx_types:clientid(), session()) -> ok).
|
|
|
|
|
|
|
|
resume(ClientId, #session{subscriptions = Subs}) ->
|
|
|
|
|
|
|
|
%% 1. Subscribe again.
|
|
|
|
|
|
|
|
lists:foreach(fun({TopicFilter, SubOpts}) ->
|
|
|
|
|
|
|
|
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
|
|
|
|
|
|
|
|
end, maps:to_list(Subs)).
|
|
|
|
|
|
|
|
%% 2. Run hooks.
|
|
|
|
|
|
|
|
%% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]),
|
|
|
|
|
|
|
|
%% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
|
|
|
|
|
|
|
|
%%Session.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
redeliver(Session = #session{inflight = Inflight}) ->
|
|
|
|
|
|
|
|
Publishes = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
|
|
|
|
|
|
|
|
{pubrel, PacketId, ?RC_SUCCESS};
|
|
|
|
|
|
|
|
({PacketId, {Msg, _Ts}}) ->
|
|
|
|
|
|
|
|
{publish, PacketId, Msg}
|
|
|
|
|
|
|
|
end, emqx_inflight:to_list(Inflight)),
|
|
|
|
|
|
|
|
case dequeue(Session) of
|
|
|
|
|
|
|
|
{ok, NSession} ->
|
|
|
|
|
|
|
|
{ok, Publishes, NSession};
|
|
|
|
|
|
|
|
{ok, More, NSession} ->
|
|
|
|
|
|
|
|
{ok, lists:append(Publishes, More), NSession}
|
|
|
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% Client -> Broker: SUBSCRIBE
|
|
|
|
%% Client -> Broker: SUBSCRIBE
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(),
|
|
|
|
-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(),
|
|
|
|
session())
|
|
|
|
emqx_types:subopts(), session())
|
|
|
|
-> {ok, session()} | {error, emqx_types:reason_code()}).
|
|
|
|
-> {ok, session()} | {error, emqx_types:reason_code()}).
|
|
|
|
subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
|
|
|
|
subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
|
|
|
|
Session = #session{subscriptions = Subs}) ->
|
|
|
|
Session = #session{subscriptions = Subs}) ->
|
|
|
@ -337,7 +311,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel,
|
|
|
|
|
|
|
|
|
|
|
|
-spec(puback(emqx_types:packet_id(), session())
|
|
|
|
-spec(puback(emqx_types:packet_id(), session())
|
|
|
|
-> {ok, emqx_types:message(), session()}
|
|
|
|
-> {ok, emqx_types:message(), session()}
|
|
|
|
| {ok, emqx_types:message(), list(publish()), session()}
|
|
|
|
| {ok, emqx_types:message(), replies(), session()}
|
|
|
|
| {error, emqx_types:reason_code()}).
|
|
|
|
| {error, emqx_types:reason_code()}).
|
|
|
|
puback(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
puback(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
@ -366,7 +340,8 @@ return_with(Msg, {ok, Publishes, Session}) ->
|
|
|
|
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
|
|
|
{value, {Msg, _Ts}} when is_record(Msg, message) ->
|
|
|
|
Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
|
|
|
|
Inflight1 = emqx_inflight:update(
|
|
|
|
|
|
|
|
PacketId, {pubrel, os:timestamp()}, Inflight),
|
|
|
|
{ok, Msg, Session#session{inflight = Inflight1}};
|
|
|
|
{ok, Msg, Session#session{inflight = Inflight1}};
|
|
|
|
{value, {pubrel, _Ts}} ->
|
|
|
|
{value, {pubrel, _Ts}} ->
|
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
|
|
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
|
|
|
@ -393,7 +368,7 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
-spec(pubcomp(emqx_types:packet_id(), session())
|
|
|
|
-spec(pubcomp(emqx_types:packet_id(), session())
|
|
|
|
-> {ok, session()} | {ok, list(publish()), session()}
|
|
|
|
-> {ok, session()} | {ok, replies(), session()}
|
|
|
|
| {error, emqx_types:reason_code()}).
|
|
|
|
| {error, emqx_types:reason_code()}).
|
|
|
|
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
|
|
@ -415,29 +390,19 @@ dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
|
|
|
|
true -> {ok, Session};
|
|
|
|
true -> {ok, Session};
|
|
|
|
false ->
|
|
|
|
false ->
|
|
|
|
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
|
|
|
|
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
|
|
|
|
deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1})
|
|
|
|
deliver(Msgs, [], Session#session{mqueue = Q1})
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
dequeue(Cnt, Msgs, Q) when Cnt =< 0 ->
|
|
|
|
dequeue(0, Msgs, Q) ->
|
|
|
|
{Msgs, Q};
|
|
|
|
{lists:reverse(Msgs), Q};
|
|
|
|
|
|
|
|
|
|
|
|
dequeue(Cnt, Msgs, Q) ->
|
|
|
|
dequeue(Cnt, Msgs, Q) ->
|
|
|
|
case emqx_mqueue:out(Q) of
|
|
|
|
case emqx_mqueue:out(Q) of
|
|
|
|
{empty, _Q} -> {Msgs, Q};
|
|
|
|
{empty, _Q} -> dequeue(0, Msgs, Q);
|
|
|
|
|
|
|
|
{{value, Msg = #message{qos = ?QOS_0}}, Q1} ->
|
|
|
|
|
|
|
|
dequeue(Cnt, [Msg|Msgs], Q1);
|
|
|
|
{{value, Msg}, Q1} ->
|
|
|
|
{{value, Msg}, Q1} ->
|
|
|
|
case emqx_message:is_expired(Msg) of
|
|
|
|
|
|
|
|
true ->
|
|
|
|
|
|
|
|
ok = emqx_metrics:inc('messages.expired'),
|
|
|
|
|
|
|
|
dequeue(Cnt-1, Msgs, Q1);
|
|
|
|
|
|
|
|
false ->
|
|
|
|
|
|
|
|
dequeue(Cnt-1, [Msg|Msgs], Q1)
|
|
|
|
dequeue(Cnt-1, [Msg|Msgs], Q1)
|
|
|
|
end
|
|
|
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
batch_n(Inflight) ->
|
|
|
|
|
|
|
|
case emqx_inflight:max_size(Inflight) of
|
|
|
|
|
|
|
|
0 -> ?DEFAULT_BATCH_N;
|
|
|
|
|
|
|
|
Sz -> Sz - emqx_inflight:size(Inflight)
|
|
|
|
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@ -445,7 +410,7 @@ batch_n(Inflight) ->
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
-spec(deliver(list(emqx_types:deliver()), session())
|
|
|
|
-spec(deliver(list(emqx_types:deliver()), session())
|
|
|
|
-> {ok, session()} | {ok, list(publish()), session()}).
|
|
|
|
-> {ok, session()} | {ok, replies(), session()}).
|
|
|
|
deliver(Delivers, Session) ->
|
|
|
|
deliver(Delivers, Session) ->
|
|
|
|
Msgs = lists:map(enrich_fun(Session), Delivers),
|
|
|
|
Msgs = lists:map(enrich_fun(Session), Delivers),
|
|
|
|
deliver(Msgs, [], Session).
|
|
|
|
deliver(Msgs, [], Session).
|
|
|
@ -457,8 +422,8 @@ deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
|
|
|
|
Publish = {undefined, maybe_ack(Msg)},
|
|
|
|
Publish = {undefined, maybe_ack(Msg)},
|
|
|
|
deliver(More, [Publish|Acc], Session);
|
|
|
|
deliver(More, [Publish|Acc], Session);
|
|
|
|
|
|
|
|
|
|
|
|
deliver([Msg = #message{qos = QoS}|More], Acc,
|
|
|
|
deliver([Msg = #message{qos = QoS}|More], Acc, Session =
|
|
|
|
Session = #session{next_pkt_id = PacketId, inflight = Inflight})
|
|
|
|
#session{next_pkt_id = PacketId, inflight = Inflight})
|
|
|
|
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
|
|
|
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
|
|
|
|
case emqx_inflight:is_full(Inflight) of
|
|
|
|
case emqx_inflight:is_full(Inflight) of
|
|
|
|
true ->
|
|
|
|
true ->
|
|
|
@ -479,7 +444,6 @@ enqueue(Delivers, Session) when is_list(Delivers) ->
|
|
|
|
Msgs = lists:map(enrich_fun(Session), Delivers),
|
|
|
|
Msgs = lists:map(enrich_fun(Session), Delivers),
|
|
|
|
lists:foldl(fun enqueue/2, Session, Msgs);
|
|
|
|
lists:foldl(fun enqueue/2, Session, Msgs);
|
|
|
|
|
|
|
|
|
|
|
|
%%TODO: how to handle the dropped msg?
|
|
|
|
|
|
|
|
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
|
|
|
|
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
|
|
|
|
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
|
|
|
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
|
|
|
|
if is_record(Dropped, message) ->
|
|
|
|
if is_record(Dropped, message) ->
|
|
|
@ -504,14 +468,6 @@ maybe_nack(Msg) ->
|
|
|
|
emqx_shared_sub:is_ack_required(Msg)
|
|
|
|
emqx_shared_sub:is_ack_required(Msg)
|
|
|
|
andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
|
|
|
|
andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
%% Awaiting ACK for QoS1/QoS2 Messages
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
|
|
|
|
|
|
|
Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight),
|
|
|
|
|
|
|
|
Session#session{inflight = Inflight1}.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
get_subopts(Topic, SubMap) ->
|
|
|
|
get_subopts(Topic, SubMap) ->
|
|
|
|
case maps:find(Topic, SubMap) of
|
|
|
|
case maps:find(Topic, SubMap) of
|
|
|
|
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
|
|
|
{ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} ->
|
|
|
@ -542,47 +498,48 @@ enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
|
|
|
|
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
|
|
|
|
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
|
|
|
|
enrich_subopts(Opts, Msg1, Session).
|
|
|
|
enrich_subopts(Opts, Msg1, Session).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
%% Awaiting ACK for QoS1/QoS2 Messages
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
|
|
|
|
|
|
|
|
Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight),
|
|
|
|
|
|
|
|
Session#session{inflight = Inflight1}.
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% Retry Delivery
|
|
|
|
%% Retry Delivery
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
%% Redeliver at once if force is true
|
|
|
|
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
|
|
|
|
retry(Session = #session{inflight = Inflight}) ->
|
|
|
|
retry(Session = #session{inflight = Inflight}) ->
|
|
|
|
case emqx_inflight:is_empty(Inflight) of
|
|
|
|
case emqx_inflight:is_empty(Inflight) of
|
|
|
|
true -> {ok, Session};
|
|
|
|
true -> {ok, Session};
|
|
|
|
false ->
|
|
|
|
false ->
|
|
|
|
SortFun = fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end,
|
|
|
|
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
|
|
|
|
Msgs = lists:sort(SortFun, emqx_inflight:to_list(Inflight)),
|
|
|
|
[], os:timestamp(), Session)
|
|
|
|
retry_delivery(Msgs, os:timestamp(), [], Session)
|
|
|
|
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
retry_delivery([], _Now, Acc, Session) ->
|
|
|
|
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
|
|
|
|
%% Retry again...
|
|
|
|
{ok, lists:reverse(Acc), Interval, Session};
|
|
|
|
{ok, lists:reverse(Acc), Session};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
retry_delivery([{PacketId, {Val, Ts}}|More], Now, Acc,
|
|
|
|
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
|
|
|
|
Session = #session{retry_interval = Interval,
|
|
|
|
#session{retry_interval = Interval, inflight = Inflight}) ->
|
|
|
|
inflight = Inflight}) ->
|
|
|
|
case (Age = age(Now, Ts)) >= Interval of
|
|
|
|
IntervalMs = Interval * 1000,
|
|
|
|
|
|
|
|
%% Microseconds -> MilliSeconds
|
|
|
|
|
|
|
|
Age = timer:now_diff(Now, Ts) div 1000,
|
|
|
|
|
|
|
|
if
|
|
|
|
|
|
|
|
Age >= IntervalMs ->
|
|
|
|
|
|
|
|
{Acc1, Inflight1} = retry_delivery(PacketId, Val, Now, Acc, Inflight),
|
|
|
|
|
|
|
|
retry_delivery(More, Now, Acc1, Session#session{inflight = Inflight1});
|
|
|
|
|
|
|
|
true ->
|
|
|
|
true ->
|
|
|
|
{ok, lists:reverse(Acc), IntervalMs - max(0, Age), Session}
|
|
|
|
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
|
|
|
|
|
|
|
|
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
|
|
|
|
|
|
|
|
false ->
|
|
|
|
|
|
|
|
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
|
|
|
|
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
|
|
|
|
case emqx_message:is_expired(Msg) of
|
|
|
|
case emqx_message:is_expired(Msg) of
|
|
|
|
true ->
|
|
|
|
true ->
|
|
|
|
ok = emqx_metrics:inc('messages.expired'),
|
|
|
|
|
|
|
|
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
|
|
|
{Acc, emqx_inflight:delete(PacketId, Inflight)};
|
|
|
|
false ->
|
|
|
|
false ->
|
|
|
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
|
|
|
Msg1 = emqx_message:set_flag(dup, true, Msg),
|
|
|
|
{[{PacketId, Msg1}|Acc],
|
|
|
|
Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
|
|
|
|
emqx_inflight:update(PacketId, {Msg1, Now}, Inflight)}
|
|
|
|
{[{PacketId, Msg1}|Acc], Inflight1}
|
|
|
|
end;
|
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
|
|
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
|
|
|
|
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
|
|
|
@ -593,30 +550,60 @@ retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
|
|
|
|
%% Expire Awaiting Rel
|
|
|
|
%% Expire Awaiting Rel
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}).
|
|
|
|
expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|
|
|
expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
|
|
|
|
case maps:size(AwaitingRel) of
|
|
|
|
case maps:size(AwaitingRel) of
|
|
|
|
0 -> {ok, Session};
|
|
|
|
0 -> {ok, Session};
|
|
|
|
_ ->
|
|
|
|
_ -> expire_awaiting_rel(os:timestamp(), Session)
|
|
|
|
AwaitingRel1 = lists:keysort(2, maps:to_list(AwaitingRel)),
|
|
|
|
|
|
|
|
expire_awaiting_rel(AwaitingRel1, os:timestamp(), Session)
|
|
|
|
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
expire_awaiting_rel([], _Now, Session) ->
|
|
|
|
expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
|
|
|
|
{ok, Session};
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
expire_awaiting_rel([{PacketId, Ts} | More], Now,
|
|
|
|
|
|
|
|
Session = #session{awaiting_rel = AwaitingRel,
|
|
|
|
|
|
|
|
await_rel_timeout = Timeout}) ->
|
|
|
|
await_rel_timeout = Timeout}) ->
|
|
|
|
case (timer:now_diff(Now, Ts) div 1000) of
|
|
|
|
|
|
|
|
Age when Age >= (Timeout * 1000) ->
|
|
|
|
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
|
|
|
|
ok = emqx_metrics:inc('messages.qos2.expired'),
|
|
|
|
case maps:filter(NotExpired, AwaitingRel) of
|
|
|
|
?LOG(warning, "Dropped qos2 packet ~s due to await_rel timeout", [PacketId]),
|
|
|
|
[] -> {ok, Session};
|
|
|
|
Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
|
|
|
|
AwaitingRel1 ->
|
|
|
|
expire_awaiting_rel(More, Now, Session1);
|
|
|
|
{ok, Timeout, Session#session{awaiting_rel = AwaitingRel1}}
|
|
|
|
Age ->
|
|
|
|
|
|
|
|
{ok, Timeout - max(0, Age), Session}
|
|
|
|
|
|
|
|
end.
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
%% Takeover, Resume and Redeliver
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(takeover(session()) -> ok).
|
|
|
|
|
|
|
|
takeover(#session{subscriptions = Subs}) ->
|
|
|
|
|
|
|
|
lists:foreach(fun({TopicFilter, _SubOpts}) ->
|
|
|
|
|
|
|
|
ok = emqx_broker:unsubscribe(TopicFilter)
|
|
|
|
|
|
|
|
end, maps:to_list(Subs)).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(resume(emqx_types:clientid(), session()) -> ok).
|
|
|
|
|
|
|
|
resume(ClientId, #session{subscriptions = Subs}) ->
|
|
|
|
|
|
|
|
%% 1. Subscribe again.
|
|
|
|
|
|
|
|
lists:foreach(fun({TopicFilter, SubOpts}) ->
|
|
|
|
|
|
|
|
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
|
|
|
|
|
|
|
|
end, maps:to_list(Subs)).
|
|
|
|
|
|
|
|
%% 2. Run hooks.
|
|
|
|
|
|
|
|
%% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]),
|
|
|
|
|
|
|
|
%% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
|
|
|
|
|
|
|
|
%%Session.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-spec(redeliver(session()) -> {ok, replies(), session()}).
|
|
|
|
|
|
|
|
redeliver(Session = #session{inflight = Inflight}) ->
|
|
|
|
|
|
|
|
Pubs = replay(Inflight),
|
|
|
|
|
|
|
|
case dequeue(Session) of
|
|
|
|
|
|
|
|
{ok, NSession} -> {ok, Pubs, NSession};
|
|
|
|
|
|
|
|
{ok, More, NSession} ->
|
|
|
|
|
|
|
|
{ok, lists:append(Pubs, More), NSession}
|
|
|
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
replay(Inflight) ->
|
|
|
|
|
|
|
|
lists:map(fun({PacketId, {pubrel, _Ts}}) ->
|
|
|
|
|
|
|
|
{pubrel, PacketId};
|
|
|
|
|
|
|
|
({PacketId, {Msg, _Ts}}) ->
|
|
|
|
|
|
|
|
{PacketId, emqx_message:set_flag(dup, true, Msg)}
|
|
|
|
|
|
|
|
end, emqx_inflight:to_list(Inflight)).
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%% Next Packet Id
|
|
|
|
%% Next Packet Id
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
@ -631,4 +618,25 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
|
|
|
|
%% Helper functions
|
|
|
|
%% Helper functions
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-compile({inline, [sort_fun/0, batch_n/1, age/2]}).
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sort_fun() ->
|
|
|
|
|
|
|
|
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
batch_n(Inflight) ->
|
|
|
|
|
|
|
|
case emqx_inflight:max_size(Inflight) of
|
|
|
|
|
|
|
|
0 -> ?DEFAULT_BATCH_N;
|
|
|
|
|
|
|
|
Sz -> Sz - emqx_inflight:size(Inflight)
|
|
|
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
age(Now, Ts) -> timer:now_diff(Now, Ts) div 1000.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
%% For CT tests
|
|
|
|
|
|
|
|
%%--------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
set_field(Name, Val, Channel) ->
|
|
|
|
|
|
|
|
Fields = record_info(fields, session),
|
|
|
|
|
|
|
|
Pos = emqx_misc:index_of(Name, Fields),
|
|
|
|
|
|
|
|
setelement(Pos+1, Channel, Val).
|
|
|
|
|
|
|
|
|
|
|
|