Merge pull request #1769 from emqtt/emqx30-feng
Update the protocol, session and mqueue modules
This commit is contained in:
commit
9a27ce0fd9
|
@ -500,6 +500,17 @@ mqtt.wildcard_subscription = true
|
||||||
## Value: boolean
|
## Value: boolean
|
||||||
mqtt.shared_subscription = true
|
mqtt.shared_subscription = true
|
||||||
|
|
||||||
|
## Message queue type.
|
||||||
|
##
|
||||||
|
## Value: simple | priority
|
||||||
|
mqtt.mqueue_type = simple
|
||||||
|
|
||||||
|
## Topic priorities. Default is 0.
|
||||||
|
##
|
||||||
|
## Priority: Number [0-255]
|
||||||
|
##
|
||||||
|
## mqtt.mqueue_priorities = topic/1=10,topic/2=8
|
||||||
|
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
## Zones
|
## Zones
|
||||||
##--------------------------------------------------------------------
|
##--------------------------------------------------------------------
|
||||||
|
@ -616,12 +627,23 @@ zone.external.await_rel_timeout = 60s
|
||||||
## Default: 2h, 2 hours
|
## Default: 2h, 2 hours
|
||||||
zone.external.session_expiry_interval = 2h
|
zone.external.session_expiry_interval = 2h
|
||||||
|
|
||||||
|
## Message queue type.
|
||||||
|
##
|
||||||
|
## Value: simple | priority
|
||||||
|
zone.external.mqueue_type = simple
|
||||||
|
|
||||||
## Maximum queue length. Enqueued messages when persistent client disconnected,
|
## Maximum queue length. Enqueued messages when persistent client disconnected,
|
||||||
## or inflight window is full. 0 means no limit.
|
## or inflight window is full. 0 means no limit.
|
||||||
##
|
##
|
||||||
## Value: Number >= 0
|
## Value: Number >= 0
|
||||||
zone.external.max_mqueue_len = 1000
|
zone.external.max_mqueue_len = 1000
|
||||||
|
|
||||||
|
## Topic priorities. Default is 0.
|
||||||
|
##
|
||||||
|
## Priority: Number [0-255]
|
||||||
|
##
|
||||||
|
## zone.external.mqueue_priorities = topic/1=10,topic/2=8
|
||||||
|
|
||||||
## Whether to enqueue Qos0 messages.
|
## Whether to enqueue Qos0 messages.
|
||||||
##
|
##
|
||||||
## Value: false | true
|
## Value: false | true
|
||||||
|
|
|
@ -646,6 +646,18 @@ end}.
|
||||||
{datatype, {enum, [true, false]}}
|
{datatype, {enum, [true, false]}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Type: simple | priority
|
||||||
|
{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [
|
||||||
|
{default, simple},
|
||||||
|
{datatype, {enum, [simple, priority]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
|
%% @doc Topic Priorities: 0~255, Default is 0
|
||||||
|
{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [
|
||||||
|
{default, ""},
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Zones
|
%% Zones
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -777,6 +789,12 @@ end}.
|
||||||
{datatype, {duration, ms}}
|
{datatype, {duration, ms}}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Type: simple | priority
|
||||||
|
{mapping, "zone.$name.mqueue_type", "emqx.zones", [
|
||||||
|
{default, simple},
|
||||||
|
{datatype, {enum, [simple, priority]}}
|
||||||
|
]}.
|
||||||
|
|
||||||
%% @doc Max queue length. Enqueued messages when persistent client
|
%% @doc Max queue length. Enqueued messages when persistent client
|
||||||
%% disconnected, or inflight window is full. 0 means no limit.
|
%% disconnected, or inflight window is full. 0 means no limit.
|
||||||
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
|
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
|
||||||
|
@ -784,6 +802,11 @@ end}.
|
||||||
{datatype, integer}
|
{datatype, integer}
|
||||||
]}.
|
]}.
|
||||||
|
|
||||||
|
%% @doc Topic Priorities: 0~255, Default is 0
|
||||||
|
{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [
|
||||||
|
{datatype, string}
|
||||||
|
]}.
|
||||||
|
|
||||||
%% @doc Queue Qos0 messages?
|
%% @doc Queue Qos0 messages?
|
||||||
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
|
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
|
||||||
{default, true},
|
{default, true},
|
||||||
|
|
|
@ -22,7 +22,7 @@
|
||||||
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
|
-export([get_flag/2, get_flag/3, set_flag/2, set_flag/3, unset_flag/2]).
|
||||||
-export([set_headers/2]).
|
-export([set_headers/2]).
|
||||||
-export([get_header/2, get_header/3, set_header/3]).
|
-export([get_header/2, get_header/3, set_header/3]).
|
||||||
-export([is_expired/1, check_expiry/1, check_expiry/2]).
|
-export([is_expired/1, check_expiry/1, check_expiry/2, update_expiry/1]).
|
||||||
-export([format/1]).
|
-export([format/1]).
|
||||||
|
|
||||||
-type(flag() :: atom()).
|
-type(flag() :: atom()).
|
||||||
|
@ -96,7 +96,7 @@ set_header(Hdr, Val, Msg = #message{headers = Headers}) ->
|
||||||
|
|
||||||
-spec(is_expired(emqx_types:message()) -> boolean()).
|
-spec(is_expired(emqx_types:message()) -> boolean()).
|
||||||
is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
is_expired(#message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||||
elapsed(CreatedAt) > Interval;
|
elapsed(CreatedAt) > timer:seconds(Interval);
|
||||||
is_expired(_Msg) ->
|
is_expired(_Msg) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
@ -108,16 +108,23 @@ check_expiry(_Msg) ->
|
||||||
|
|
||||||
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
|
-spec(check_expiry(emqx_types:message(), erlang:timestamp()) -> {ok, pos_integer()} | expired | false).
|
||||||
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
|
check_expiry(#message{headers = #{'Message-Expiry-Interval' := Interval}}, Since) ->
|
||||||
case Interval - elapsed(Since) of
|
case Interval - (elapsed(Since) div 1000) of
|
||||||
I when I > 0 -> {ok, I};
|
Timeout when Timeout > 0 -> {ok, Timeout};
|
||||||
_ -> expired
|
_ -> expired
|
||||||
end;
|
end;
|
||||||
check_expiry(_Msg, _Since) ->
|
check_expiry(_Msg, _Since) ->
|
||||||
false.
|
false.
|
||||||
|
|
||||||
|
update_expiry(Msg = #message{headers = #{'Message-Expiry-Interval' := Interval}, timestamp = CreatedAt}) ->
|
||||||
|
case elapsed(CreatedAt) of
|
||||||
|
Elapsed when Elapsed > 0 ->
|
||||||
|
set_header('Message-Expiry-Interval', max(1, Interval - (Elapsed div 1000)), Msg);
|
||||||
|
_ -> Msg
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% MilliSeconds
|
||||||
elapsed(Since) ->
|
elapsed(Since) ->
|
||||||
Secs = timer:now_diff(os:timestamp(), Since) div 1000,
|
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
|
||||||
if Secs < 0 -> 0; true -> Secs end.
|
|
||||||
|
|
||||||
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
|
||||||
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
|
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
|
||||||
|
|
|
@ -77,6 +77,7 @@
|
||||||
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
|
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
|
||||||
{gauge, 'messages/retained'}, % Messagea retained
|
{gauge, 'messages/retained'}, % Messagea retained
|
||||||
{counter, 'messages/dropped'}, % Messages dropped
|
{counter, 'messages/dropped'}, % Messages dropped
|
||||||
|
{counter, 'messages/expired'}, % Messages expired
|
||||||
{counter, 'messages/forward'} % Messages forward
|
{counter, 'messages/forward'} % Messages forward
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@
|
||||||
%% See the License for the specific language governing permissions and
|
%% See the License for the specific language governing permissions and
|
||||||
%% limitations under the License.
|
%% limitations under the License.
|
||||||
|
|
||||||
%% TODO: should be a bound queue.
|
|
||||||
%% @doc A Simple in-memory message queue.
|
%% @doc A Simple in-memory message queue.
|
||||||
%%
|
%%
|
||||||
%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
|
%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
|
||||||
|
@ -39,70 +38,67 @@
|
||||||
%%
|
%%
|
||||||
%% @end
|
%% @end
|
||||||
|
|
||||||
%% TODO: ...
|
|
||||||
-module(emqx_mqueue).
|
-module(emqx_mqueue).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
-include("emqx_mqtt.hrl").
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
-import(proplists, [get_value/3]).
|
-export([init/1, type/1]).
|
||||||
|
-export([is_empty/1]).
|
||||||
-export([new/2, type/1, name/1, is_empty/1, len/1, max_len/1, in/2, out/1]).
|
-export([len/1, max_len/1]).
|
||||||
-export([dropped/1, stats/1]).
|
-export([in/2, out/1]).
|
||||||
|
-export([stats/1, dropped/1]).
|
||||||
|
|
||||||
-define(PQUEUE, emqx_pqueue).
|
-define(PQUEUE, emqx_pqueue).
|
||||||
|
|
||||||
-type(priority() :: {iolist(), pos_integer()}).
|
-type(priority() :: {iolist(), pos_integer()}).
|
||||||
|
|
||||||
-type(options() :: #{type => simple | priority,
|
-type(options() :: #{type := simple | priority,
|
||||||
max_len => non_neg_integer(),
|
max_len := non_neg_integer(),
|
||||||
priority => list(priority()),
|
priorities => list(priority()),
|
||||||
store_qos0 => boolean()}).
|
store_qos0 => boolean()}).
|
||||||
|
|
||||||
-type(stat() :: {max_len, non_neg_integer()}
|
-type(stat() :: {len, non_neg_integer()}
|
||||||
| {len, non_neg_integer()}
|
| {max_len, non_neg_integer()}
|
||||||
| {dropped, non_neg_integer()}).
|
| {dropped, non_neg_integer()}).
|
||||||
|
|
||||||
-record(mqueue, {type :: simple | priority,
|
-record(mqueue, {
|
||||||
name, q :: queue:queue() | ?PQUEUE:q(),
|
type :: simple | priority,
|
||||||
|
q :: queue:queue() | ?PQUEUE:q(),
|
||||||
%% priority table
|
%% priority table
|
||||||
pseq = 0, priorities = [],
|
priorities = [],
|
||||||
%% len of simple queue
|
pseq = 0,
|
||||||
len = 0, max_len = 0,
|
len = 0,
|
||||||
qos0 = false, dropped = 0}).
|
max_len = 0,
|
||||||
|
qos0 = false,
|
||||||
|
dropped = 0
|
||||||
|
}).
|
||||||
|
|
||||||
-type(mqueue() :: #mqueue{}).
|
-type(mqueue() :: #mqueue{}).
|
||||||
|
|
||||||
-export_type([mqueue/0, priority/0, options/0]).
|
-export_type([mqueue/0, priority/0, options/0]).
|
||||||
|
|
||||||
-spec(new(iolist(), options()) -> mqueue()).
|
-spec(init(options()) -> mqueue()).
|
||||||
new(Name, #{type := Type, max_len := MaxLen, store_qos0 := StoreQos0}) ->
|
init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) ->
|
||||||
init_q(#mqueue{type = Type, name = iolist_to_binary(Name),
|
init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts).
|
||||||
len = 0, max_len = MaxLen, qos0 = StoreQos0}).
|
|
||||||
|
|
||||||
init_q(MQ = #mqueue{type = simple}) ->
|
init_q(MQ = #mqueue{type = simple}, _Opts) ->
|
||||||
MQ#mqueue{q = queue:new()};
|
MQ#mqueue{q = queue:new()};
|
||||||
init_q(MQ = #mqueue{type = priority}) ->
|
init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) ->
|
||||||
%%Priorities = get_value(priority, Opts, []),
|
init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
|
||||||
init_p([], MQ#mqueue{q = ?PQUEUE:new()}).
|
|
||||||
|
|
||||||
init_p([], MQ) ->
|
init_pq([], MQ) ->
|
||||||
MQ;
|
MQ;
|
||||||
init_p([{Topic, P} | L], MQ) ->
|
init_pq([{Topic, P} | L], MQ) ->
|
||||||
{_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
|
{_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
|
||||||
init_p(L, MQ1).
|
init_pq(L, MQ1).
|
||||||
|
|
||||||
insert_p(Topic, P, MQ = #mqueue{priorities = Tab, pseq = Seq}) ->
|
insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) ->
|
||||||
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
|
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
|
||||||
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | Tab], pseq = Seq + 1}}.
|
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}.
|
||||||
|
|
||||||
-spec(name(mqueue()) -> iolist()).
|
-spec(type(mqueue()) -> simple | priority).
|
||||||
name(#mqueue{name = Name}) ->
|
type(#mqueue{type = Type}) -> Type.
|
||||||
Name.
|
|
||||||
|
|
||||||
-spec(type(mqueue()) -> atom()).
|
|
||||||
type(#mqueue{type = Type}) ->
|
|
||||||
Type.
|
|
||||||
|
|
||||||
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
|
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
|
||||||
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
|
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
|
||||||
|
|
|
@ -455,8 +455,9 @@ deliver({connack, ReasonCode, SP}, PState) ->
|
||||||
|
|
||||||
deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
|
deliver({publish, PacketId, Msg}, PState = #pstate{is_bridge = IsBridge, mountpoint = MountPoint}) ->
|
||||||
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
|
_ = emqx_hooks:run('message.delivered', [credentials(PState)], Msg),
|
||||||
Msg1 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg)),
|
Msg1 = emqx_message:update_expiry(Msg),
|
||||||
send(emqx_packet:from_message(PacketId, Msg1), PState);
|
Msg2 = emqx_mountpoint:unmount(MountPoint, clean_retain(IsBridge, Msg1)),
|
||||||
|
send(emqx_packet:from_message(PacketId, Msg2), PState);
|
||||||
|
|
||||||
deliver({puback, PacketId, ReasonCode}, PState) ->
|
deliver({puback, PacketId, ReasonCode}, PState) ->
|
||||||
send(?PUBACK_PACKET(PacketId, ReasonCode), PState);
|
send(?PUBACK_PACKET(PacketId, ReasonCode), PState);
|
||||||
|
|
|
@ -62,6 +62,9 @@
|
||||||
-import(emqx_zone, [get_env/2, get_env/3]).
|
-import(emqx_zone, [get_env/2, get_env/3]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
|
%% Idle timeout
|
||||||
|
idle_timeout :: pos_integer(),
|
||||||
|
|
||||||
%% Clean Start Flag
|
%% Clean Start Flag
|
||||||
clean_start = false :: boolean(),
|
clean_start = false :: boolean(),
|
||||||
|
|
||||||
|
@ -134,10 +137,10 @@
|
||||||
%% Stats timer
|
%% Stats timer
|
||||||
stats_timer :: reference() | undefined,
|
stats_timer :: reference() | undefined,
|
||||||
|
|
||||||
%% TODO:
|
%% Deliver stats
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
|
|
||||||
%% TODO:
|
%% Enqueue stats
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
|
|
||||||
%% Created at
|
%% Created at
|
||||||
|
@ -150,11 +153,10 @@
|
||||||
emqx_logger:Level([{client, State#state.client_id}],
|
emqx_logger:Level([{client, State#state.client_id}],
|
||||||
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
"Session(~s): " ++ Format, [State#state.client_id | Args])).
|
||||||
|
|
||||||
%% @doc Start a session
|
%% @doc Start a session proc.
|
||||||
-spec(start_link(SessAttrs :: map()) -> {ok, pid()} | {error, term()}).
|
-spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
|
||||||
start_link(SessAttrs) ->
|
start_link(SessAttrs) ->
|
||||||
IdleTimeout = maps:get(idle_timeout, SessAttrs, 30000),
|
proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]).
|
||||||
gen_server:start_link(?MODULE, SessAttrs, [{hibernate_after, IdleTimeout}]).
|
|
||||||
|
|
||||||
%% @doc Get session info
|
%% @doc Get session info
|
||||||
-spec(info(pid() | #state{}) -> list({atom(), term()})).
|
-spec(info(pid() | #state{}) -> list({atom(), term()})).
|
||||||
|
@ -314,16 +316,18 @@ close(SPid) ->
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init(#{zone := Zone,
|
init([Parent, #{zone := Zone,
|
||||||
client_id := ClientId,
|
client_id := ClientId,
|
||||||
username := Username,
|
username := Username,
|
||||||
conn_pid := ConnPid,
|
conn_pid := ConnPid,
|
||||||
clean_start := CleanStart,
|
clean_start := CleanStart,
|
||||||
conn_props := ConnProps}) ->
|
conn_props := ConnProps}]) ->
|
||||||
process_flag(trap_exit, true),
|
process_flag(trap_exit, true),
|
||||||
true = link(ConnPid),
|
true = link(ConnPid),
|
||||||
MaxInflight = get_env(Zone, max_inflight),
|
MaxInflight = get_env(Zone, max_inflight),
|
||||||
State = #state{clean_start = CleanStart,
|
IdleTimout = get_env(Zone, idle_timeout, 30000),
|
||||||
|
State = #state{idle_timeout = IdleTimout,
|
||||||
|
clean_start = CleanStart,
|
||||||
binding = binding(ConnPid),
|
binding = binding(ConnPid),
|
||||||
client_id = ClientId,
|
client_id = ClientId,
|
||||||
username = Username,
|
username = Username,
|
||||||
|
@ -332,7 +336,7 @@ init(#{zone := Zone,
|
||||||
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
max_subscriptions = get_env(Zone, max_subscriptions, 0),
|
||||||
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
upgrade_qos = get_env(Zone, upgrade_qos, false),
|
||||||
inflight = emqx_inflight:new(MaxInflight),
|
inflight = emqx_inflight:new(MaxInflight),
|
||||||
mqueue = init_mqueue(Zone, ClientId),
|
mqueue = init_mqueue(Zone),
|
||||||
retry_interval = get_env(Zone, retry_interval, 0),
|
retry_interval = get_env(Zone, retry_interval, 0),
|
||||||
awaiting_rel = #{},
|
awaiting_rel = #{},
|
||||||
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
await_rel_timeout = get_env(Zone, await_rel_timeout),
|
||||||
|
@ -342,20 +346,23 @@ init(#{zone := Zone,
|
||||||
deliver_stats = 0,
|
deliver_stats = 0,
|
||||||
enqueue_stats = 0,
|
enqueue_stats = 0,
|
||||||
created_at = os:timestamp()},
|
created_at = os:timestamp()},
|
||||||
emqx_sm:register_session(ClientId, [{zone, Zone} | attrs(State)]),
|
emqx_sm:register_session(ClientId, attrs(State)),
|
||||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
|
emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
|
||||||
{ok, State}.
|
ok = proc_lib:init_ack(Parent, {ok, self()}),
|
||||||
|
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
|
||||||
|
|
||||||
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
|
expire_interval(_Zone, #{'Session-Expiry-Interval' := I}) ->
|
||||||
I * 1000;
|
I * 1000;
|
||||||
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
|
expire_interval(Zone, _ConnProps) -> %% Maybe v3.1.1
|
||||||
get_env(Zone, session_expiry_interval, 0).
|
get_env(Zone, session_expiry_interval, 0).
|
||||||
|
|
||||||
init_mqueue(Zone, ClientId) ->
|
init_mqueue(Zone) ->
|
||||||
emqx_mqueue:new(ClientId, #{type => simple,
|
emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
|
||||||
max_len => get_env(Zone, max_mqueue_len),
|
max_len => get_env(Zone, max_mqueue_len, 1000),
|
||||||
store_qos0 => get_env(Zone, mqueue_store_qos0)}).
|
priorities => get_env(Zone, mqueue_priorities, ""),
|
||||||
|
store_qos0 => get_env(Zone, mqueue_store_qos0, true)
|
||||||
|
}).
|
||||||
|
|
||||||
binding(ConnPid) ->
|
binding(ConnPid) ->
|
||||||
case node(ConnPid) =:= node() of true -> local; false -> remote end.
|
case node(ConnPid) =:= node() of true -> local; false -> remote end.
|
||||||
|
@ -371,43 +378,43 @@ handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) ->
|
||||||
%% PUBLISH:
|
%% PUBLISH:
|
||||||
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
|
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
|
||||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
case is_awaiting_full(State) of
|
reply(case is_awaiting_full(State) of
|
||||||
false ->
|
false ->
|
||||||
case maps:is_key(PacketId, AwaitingRel) of
|
case maps:is_key(PacketId, AwaitingRel) of
|
||||||
true ->
|
true ->
|
||||||
reply({error, ?RC_PACKET_IDENTIFIER_IN_USE}, State);
|
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
|
||||||
false ->
|
false ->
|
||||||
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
|
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)},
|
||||||
reply(emqx_broker:publish(Msg), ensure_await_rel_timer(State1))
|
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
|
||||||
end;
|
end;
|
||||||
true ->
|
true ->
|
||||||
?LOG(warning, "Dropped QoS2 Message for too many awaiting_rel: ~p", [Msg], State),
|
|
||||||
emqx_metrics:inc('messages/qos2/dropped'),
|
emqx_metrics:inc('messages/qos2/dropped'),
|
||||||
reply({error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State)
|
?LOG(warning, "Dropped message for too many awaiting_rel: ~p",
|
||||||
end;
|
[emqx_message:format(Msg)], State),
|
||||||
|
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
|
||||||
|
end);
|
||||||
|
|
||||||
%% PUBREC:
|
%% PUBREC:
|
||||||
handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
|
handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = Inflight}) ->
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
reply(case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
reply(ok, acked(pubrec, PacketId, State));
|
{ok, acked(pubrec, PacketId, State)};
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBREC PacketId is not found: ~w", [PacketId], State),
|
|
||||||
emqx_metrics:inc('packets/pubrec/missed'),
|
emqx_metrics:inc('packets/pubrec/missed'),
|
||||||
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
|
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
|
||||||
end;
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||||
|
end);
|
||||||
|
|
||||||
%% PUBREL:
|
%% PUBREL:
|
||||||
handle_call({pubrel, PacketId, _ReasonCode}, _From,
|
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
State = #state{awaiting_rel = AwaitingRel}) ->
|
reply(case maps:take(PacketId, AwaitingRel) of
|
||||||
case maps:take(PacketId, AwaitingRel) of
|
|
||||||
{_, AwaitingRel1} ->
|
{_, AwaitingRel1} ->
|
||||||
reply(ok, State#state{awaiting_rel = AwaitingRel1});
|
{ok, State#state{awaiting_rel = AwaitingRel1}};
|
||||||
error ->
|
error ->
|
||||||
?LOG(warning, "Cannot find PUBREL: ~p", [PacketId], State),
|
|
||||||
emqx_metrics:inc('packets/pubrel/missed'),
|
emqx_metrics:inc('packets/pubrel/missed'),
|
||||||
reply({error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State)
|
?LOG(warning, "Cannot find PUBREL: ~w", [PacketId], State),
|
||||||
end;
|
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
|
||||||
|
end);
|
||||||
|
|
||||||
handle_call(info, _From, State) ->
|
handle_call(info, _From, State) ->
|
||||||
reply(info(State), State);
|
reply(info(State), State);
|
||||||
|
@ -444,7 +451,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
|
||||||
end}
|
end}
|
||||||
end, {[], Subscriptions}, TopicFilters),
|
end, {[], Subscriptions}, TopicFilters),
|
||||||
suback(FromPid, PacketId, ReasonCodes),
|
suback(FromPid, PacketId, ReasonCodes),
|
||||||
{noreply, State#state{subscriptions = Subscriptions1}};
|
noreply(State#state{subscriptions = Subscriptions1});
|
||||||
|
|
||||||
%% UNSUBSCRIBE:
|
%% UNSUBSCRIBE:
|
||||||
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
|
@ -461,15 +468,15 @@ handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
|
||||||
end
|
end
|
||||||
end, {[], Subscriptions}, TopicFilters),
|
end, {[], Subscriptions}, TopicFilters),
|
||||||
unsuback(From, PacketId, ReasonCodes),
|
unsuback(From, PacketId, ReasonCodes),
|
||||||
{noreply, State#state{subscriptions = Subscriptions1}};
|
noreply(State#state{subscriptions = Subscriptions1});
|
||||||
|
|
||||||
%% PUBACK:
|
%% PUBACK:
|
||||||
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
{noreply, dequeue(acked(puback, PacketId, State))};
|
noreply(dequeue(acked(puback, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBACK PacketId is not found: ~w", [PacketId], State),
|
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:inc('packets/puback/missed'),
|
emqx_metrics:inc('packets/puback/missed'),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
@ -478,9 +485,9 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
|
||||||
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight}) ->
|
||||||
case emqx_inflight:contain(PacketId, Inflight) of
|
case emqx_inflight:contain(PacketId, Inflight) of
|
||||||
true ->
|
true ->
|
||||||
{noreply, dequeue(acked(pubcomp, PacketId, State))};
|
noreply(dequeue(acked(pubcomp, PacketId, State)));
|
||||||
false ->
|
false ->
|
||||||
?LOG(warning, "The PUBCOMP PacketId is not found: ~w", [PacketId], State),
|
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
|
||||||
emqx_metrics:inc('packets/pubcomp/missed'),
|
emqx_metrics:inc('packets/pubcomp/missed'),
|
||||||
{noreply, State}
|
{noreply, State}
|
||||||
end;
|
end;
|
||||||
|
@ -499,7 +506,7 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
|
lists:foreach(fun emqx_misc:cancel_timer/1, [RetryTimer, AwaitTimer, ExpireTimer]),
|
||||||
|
|
||||||
case kick(ClientId, OldConnPid, ConnPid) of
|
case kick(ClientId, OldConnPid, ConnPid) of
|
||||||
ok -> ?LOG(warning, "connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
|
||||||
ignore -> ok
|
ignore -> ok
|
||||||
end,
|
end,
|
||||||
|
|
||||||
|
@ -514,13 +521,13 @@ handle_cast({resume, ConnPid}, State = #state{client_id = ClientId,
|
||||||
await_rel_timer = undefined,
|
await_rel_timer = undefined,
|
||||||
expiry_timer = undefined},
|
expiry_timer = undefined},
|
||||||
|
|
||||||
%% Clean Session: true -> false?
|
%% Clean Session: true -> false???
|
||||||
CleanStart andalso emqx_sm:set_session_attrs(ClientId, info(State1)),
|
CleanStart andalso emqx_sm:set_session_attrs(ClientId, attrs(State1)),
|
||||||
|
|
||||||
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(State)]),
|
||||||
|
|
||||||
%% Replay delivery and Dequeue pending messages
|
%% Replay delivery and Dequeue pending messages
|
||||||
{noreply, ensure_stats_timer(dequeue(retry_delivery(true, State1)))};
|
noreply(dequeue(retry_delivery(true, State1)));
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
|
||||||
|
@ -534,58 +541,63 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
|
||||||
|
|
||||||
%% Dispatch message
|
%% Dispatch message
|
||||||
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
|
handle_info({dispatch, Topic, Msg}, State = #state{subscriptions = SubMap}) when is_record(Msg, message) ->
|
||||||
{noreply, case maps:find(Topic, SubMap) of
|
noreply(case maps:find(Topic, SubMap) of
|
||||||
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
|
{ok, #{nl := Nl, qos := QoS, subid := SubId}} ->
|
||||||
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
|
run_dispatch_steps([{nl, Nl},{qos, QoS}, {subid, SubId}], Msg, State);
|
||||||
{ok, #{nl := Nl, qos := QoS}} ->
|
{ok, #{nl := Nl, qos := QoS}} ->
|
||||||
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
|
run_dispatch_steps([{nl, Nl},{qos, QoS}], Msg, State);
|
||||||
error ->
|
error ->
|
||||||
dispatch(reset_dup(Msg), State)
|
dispatch(emqx_message:unset_flag(dup, Msg), State)
|
||||||
end};
|
end);
|
||||||
|
|
||||||
%% Do nothing if the client has been disconnected.
|
%% Do nothing if the client has been disconnected.
|
||||||
handle_info({timeout, _Timer, retry_delivery}, State = #state{conn_pid = undefined}) ->
|
handle_info({timeout, Timer, retry_delivery}, State = #state{conn_pid = undefined, retry_timer = Timer}) ->
|
||||||
{noreply, ensure_stats_timer(State#state{retry_timer = undefined})};
|
noreply(State#state{retry_timer = undefined});
|
||||||
|
|
||||||
handle_info({timeout, _Timer, retry_delivery}, State) ->
|
handle_info({timeout, Timer, retry_delivery}, State = #state{retry_timer = Timer}) ->
|
||||||
{noreply, ensure_stats_timer(retry_delivery(false, State#state{retry_timer = undefined}))};
|
noreply(retry_delivery(false, State#state{retry_timer = undefined}));
|
||||||
|
|
||||||
handle_info({timeout, _Timer, check_awaiting_rel}, State) ->
|
handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer = Timer}) ->
|
||||||
{noreply, ensure_stats_timer(expire_awaiting_rel(State#state{await_rel_timer = undefined}))};
|
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
|
||||||
|
|
||||||
handle_info({timeout, _Timer, expired}, State) ->
|
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
|
||||||
?LOG(info, "Expired, shutdown now.", [], State),
|
true = emqx_sm:set_session_stats(ClientId, stats(State)),
|
||||||
|
{noreply, State#state{stats_timer = undefined}, hibernate};
|
||||||
|
|
||||||
|
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
|
||||||
|
?LOG(info, "expired, shutdown now:(", [], State),
|
||||||
shutdown(expired, State);
|
shutdown(expired, State);
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{clean_start = true, conn_pid = ConnPid}) ->
|
||||||
{stop, Reason, State};
|
{stop, Reason, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{expiry_interval = 0, conn_pid = ConnPid}) ->
|
||||||
{stop, Reason, State};
|
{stop, Reason, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
|
handle_info({'EXIT', ConnPid, _Reason}, State = #state{clean_start = false, conn_pid = ConnPid}) ->
|
||||||
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
|
{noreply, ensure_expire_timer(State#state{conn_pid = undefined})};
|
||||||
|
|
||||||
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
|
||||||
%% ignore
|
%% ignore
|
||||||
{noreply, State#state{old_conn_pid = undefined}, hibernate};
|
{noreply, State#state{old_conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
?LOG(error, "unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
|
||||||
[ConnPid, Pid, Reason], State),
|
[ConnPid, Pid, Reason], State),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(emit_stats, State = #state{client_id = ClientId}) ->
|
|
||||||
emqx_sm:set_session_stats(ClientId, stats(State)),
|
|
||||||
{noreply, State#state{stats_timer = undefined}, hibernate};
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Session] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(Reason, #state{client_id = ClientId}) ->
|
terminate(Reason, #state{client_id = ClientId, conn_pid = ConnPid}) ->
|
||||||
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]),
|
||||||
%%TODO: notify conn_pid to shutdown?
|
%% Ensure to shutdown the connection
|
||||||
|
if
|
||||||
|
ConnPid =/= undefined ->
|
||||||
|
ConnPid ! {shutdown, Reason};
|
||||||
|
true -> ok
|
||||||
|
end,
|
||||||
emqx_sm:unregister_session(ClientId).
|
emqx_sm:unregister_session(ClientId).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -616,7 +628,7 @@ kick(ClientId, OldPid, Pid) ->
|
||||||
unlink(OldPid),
|
unlink(OldPid),
|
||||||
OldPid ! {shutdown, conflict, {ClientId, Pid}},
|
OldPid ! {shutdown, conflict, {ClientId, Pid}},
|
||||||
%% Clean noproc
|
%% Clean noproc
|
||||||
receive {'EXIT', OldPid, _} -> ok after 0 -> ok end.
|
receive {'EXIT', OldPid, _} -> ok after 1 -> ok end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Replay or Retry Delivery
|
%% Replay or Retry Delivery
|
||||||
|
@ -627,30 +639,37 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
|
||||||
case emqx_inflight:is_empty(Inflight) of
|
case emqx_inflight:is_empty(Inflight) of
|
||||||
true -> State;
|
true -> State;
|
||||||
false ->
|
false ->
|
||||||
Msgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)),
|
||||||
retry_delivery(Force, Msgs, os:timestamp(), State)
|
retry_delivery(Force, InflightMsgs, os:timestamp(), State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
retry_delivery(_Force, [], _Now, State = #state{retry_interval = Interval}) ->
|
retry_delivery(_Force, [], _Now, State) ->
|
||||||
State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
%% Retry again...
|
||||||
|
ensure_retry_timer(State);
|
||||||
|
|
||||||
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
|
||||||
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
State = #state{inflight = Inflight, retry_interval = Interval}) ->
|
||||||
Diff = timer:now_diff(Now, Ts) div 1000, %% micro -> ms
|
%% Microseconds -> MilliSeconds
|
||||||
|
Diff = timer:now_diff(Now, Ts) div 1000,
|
||||||
if
|
if
|
||||||
Force orelse (Diff >= Interval) ->
|
Force orelse (Diff >= Interval) ->
|
||||||
case {Type, Msg0} of
|
Inflight1 = case {Type, Msg0} of
|
||||||
{publish, {PacketId, Msg}} ->
|
{publish, {PacketId, Msg}} ->
|
||||||
|
case emqx_message:is_expired(Msg) of
|
||||||
|
true ->
|
||||||
|
emqx_metrics:inc('messages/expired'),
|
||||||
|
emqx_inflight:delete(PacketId, Inflight);
|
||||||
|
false ->
|
||||||
redeliver({PacketId, Msg}, State),
|
redeliver({PacketId, Msg}, State),
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight),
|
emqx_inflight:update(PacketId, {publish, {PacketId, Msg}, Now}, Inflight)
|
||||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
end;
|
||||||
{pubrel, PacketId} ->
|
{pubrel, PacketId} ->
|
||||||
redeliver({pubrel, PacketId}, State),
|
redeliver({pubrel, PacketId}, State),
|
||||||
Inflight1 = emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight),
|
emqx_inflight:update(PacketId, {pubrel, PacketId, Now}, Inflight)
|
||||||
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1})
|
end,
|
||||||
end;
|
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
|
||||||
true ->
|
true ->
|
||||||
State#state{retry_timer = emqx_misc:start_timer(Interval - Diff, retry_delivery)}
|
ensure_retry_timer(Interval - Diff, State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -667,16 +686,16 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
|
||||||
expire_awaiting_rel([], _Now, State) ->
|
expire_awaiting_rel([], _Now, State) ->
|
||||||
State#state{await_rel_timer = undefined};
|
State#state{await_rel_timer = undefined};
|
||||||
|
|
||||||
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs],
|
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now,
|
||||||
Now, State = #state{awaiting_rel = AwaitingRel,
|
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
|
||||||
await_rel_timeout = Timeout}) ->
|
|
||||||
case (timer:now_diff(Now, TS) div 1000) of
|
case (timer:now_diff(Now, TS) div 1000) of
|
||||||
Diff when Diff >= Timeout ->
|
Diff when Diff >= Timeout ->
|
||||||
?LOG(warning, "Dropped Qos2 Message for await_rel_timeout: ~p", [Msg], State),
|
|
||||||
emqx_metrics:inc('messages/qos2/dropped'),
|
emqx_metrics:inc('messages/qos2/dropped'),
|
||||||
|
?LOG(warning, "Dropped message for await_rel_timeout: ~p",
|
||||||
|
[emqx_message:format(Msg)], State),
|
||||||
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
|
||||||
Diff ->
|
Diff ->
|
||||||
State#state{await_rel_timer = emqx_misc:start_timer(Timeout - Diff, check_awaiting_rel)}
|
ensure_await_rel_timer(Timeout - Diff, State)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -733,12 +752,10 @@ dispatch(Msg = #message{qos = ?QOS0}, State) ->
|
||||||
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
|
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
|
||||||
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
|
||||||
case emqx_inflight:is_full(Inflight) of
|
case emqx_inflight:is_full(Inflight) of
|
||||||
true ->
|
true -> enqueue_msg(Msg, State);
|
||||||
enqueue_msg(Msg, State);
|
|
||||||
false ->
|
false ->
|
||||||
deliver(PacketId, Msg, State),
|
deliver(PacketId, Msg, State),
|
||||||
%% TODO inc_stats??
|
await(PacketId, Msg, inc_stats(deliver, next_pkt_id(State)))
|
||||||
await(PacketId, Msg, next_pkt_id(inc_stats(deliver, State)))
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
|
||||||
|
@ -765,15 +782,10 @@ deliver(PacketId, Msg, #state{conn_pid = ConnPid, binding = remote}) ->
|
||||||
%% Awaiting ACK for QoS1/QoS2 Messages
|
%% Awaiting ACK for QoS1/QoS2 Messages
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
await(PacketId, Msg, State = #state{inflight = Inflight,
|
await(PacketId, Msg, State = #state{inflight = Inflight}) ->
|
||||||
retry_timer = RetryTimer,
|
Inflight1 = emqx_inflight:insert(
|
||||||
retry_interval = Interval}) ->
|
PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight),
|
||||||
%% Start retry timer if the Inflight is still empty
|
ensure_retry_timer(State#state{inflight = Inflight1}).
|
||||||
State1 = case RetryTimer == undefined of
|
|
||||||
true -> State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
|
||||||
false -> State
|
|
||||||
end,
|
|
||||||
State1#state{inflight = emqx_inflight:insert(PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight)}.
|
|
||||||
|
|
||||||
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
|
||||||
case emqx_inflight:lookup(PacketId, Inflight) of
|
case emqx_inflight:lookup(PacketId, Inflight) of
|
||||||
|
@ -781,7 +793,7 @@ acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Infligh
|
||||||
emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
|
emqx_hooks:run('message.acked', [#{client_id =>ClientId}], Msg),
|
||||||
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
|
||||||
none ->
|
none ->
|
||||||
?LOG(warning, "Duplicated PUBACK Packet: ~p", [PacketId], State),
|
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
|
||||||
State
|
State
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -791,10 +803,10 @@ acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Infligh
|
||||||
emqx_hooks:run('message.acked', [ClientId], Msg),
|
emqx_hooks:run('message.acked', [ClientId], Msg),
|
||||||
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
|
||||||
{value, {pubrel, PacketId, _Ts}} ->
|
{value, {pubrel, PacketId, _Ts}} ->
|
||||||
?LOG(warning, "Duplicated PUBREC Packet: ~p", [PacketId], State),
|
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),
|
||||||
State;
|
State;
|
||||||
none ->
|
none ->
|
||||||
?LOG(warning, "Unexpected PUBREC Packet: ~p", [PacketId], State),
|
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId], State),
|
||||||
State
|
State
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -824,28 +836,43 @@ dequeue2(State = #state{mqueue = Q}) ->
|
||||||
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
|
dequeue(dispatch(Msg, State#state{mqueue = Q1}))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Ensure timers
|
%% Ensure timers
|
||||||
|
|
||||||
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
|
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
|
||||||
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
|
ensure_await_rel_timer(Timeout, State);
|
||||||
|
|
||||||
ensure_await_rel_timer(State) ->
|
ensure_await_rel_timer(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
|
ensure_await_rel_timer(Timeout, State = #state{await_rel_timer = undefined}) ->
|
||||||
|
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
|
||||||
|
ensure_await_rel_timer(_Timeout, State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
ensure_retry_timer(State = #state{retry_timer = undefined, retry_interval = Interval}) ->
|
||||||
|
ensure_retry_timer(Interval, State);
|
||||||
|
ensure_retry_timer(State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
|
ensure_retry_timer(Interval, State = #state{retry_timer = undefined}) ->
|
||||||
|
State#state{retry_timer = emqx_misc:start_timer(Interval, retry_delivery)};
|
||||||
|
ensure_retry_timer(_Timeout, State) ->
|
||||||
|
State.
|
||||||
|
|
||||||
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
|
ensure_expire_timer(State = #state{expiry_interval = Interval}) when Interval > 0 ->
|
||||||
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
|
State#state{expiry_timer = emqx_misc:start_timer(Interval, expired)};
|
||||||
ensure_expire_timer(State) ->
|
ensure_expire_timer(State) ->
|
||||||
State.
|
State.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined,
|
||||||
%% Reset Dup
|
idle_timeout = IdleTimeout}) ->
|
||||||
|
State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
|
||||||
reset_dup(Msg) ->
|
ensure_stats_timer(State) ->
|
||||||
emqx_message:unset_flag(dup, Msg).
|
State.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Next Msg Id
|
%% Next Packet Id
|
||||||
|
|
||||||
next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) ->
|
next_pkt_id(State = #state{next_pkt_id = 16#FFFF}) ->
|
||||||
State#state{next_pkt_id = 1};
|
State#state{next_pkt_id = 1};
|
||||||
|
@ -854,26 +881,28 @@ next_pkt_id(State = #state{next_pkt_id = Id}) ->
|
||||||
State#state{next_pkt_id = Id + 1}.
|
State#state{next_pkt_id = Id + 1}.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Ensure stats timer
|
%% Inc stats
|
||||||
|
|
||||||
ensure_stats_timer(State = #state{enable_stats = true, stats_timer = undefined}) ->
|
|
||||||
State#state{stats_timer = erlang:send_after(30000, self(), emit_stats)};
|
|
||||||
ensure_stats_timer(State) ->
|
|
||||||
State.
|
|
||||||
|
|
||||||
inc_stats(deliver, State = #state{deliver_stats = I}) ->
|
inc_stats(deliver, State = #state{deliver_stats = I}) ->
|
||||||
State#state{deliver_stats = I + 1};
|
State#state{deliver_stats = I + 1};
|
||||||
inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
|
inc_stats(enqueue, State = #state{enqueue_stats = I}) ->
|
||||||
State#state{enqueue_stats = I + 1}.
|
State#state{enqueue_stats = I + 1}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
|
|
||||||
|
reply({Reply, State}) ->
|
||||||
|
reply(Reply, State).
|
||||||
|
|
||||||
reply(Reply, State) ->
|
reply(Reply, State) ->
|
||||||
{reply, Reply, State}.
|
{reply, Reply, ensure_stats_timer(State)}.
|
||||||
|
|
||||||
|
noreply(State) ->
|
||||||
|
{noreply, ensure_stats_timer(State)}.
|
||||||
|
|
||||||
shutdown(Reason, State) ->
|
shutdown(Reason, State) ->
|
||||||
{stop, {shutdown, Reason}, State}.
|
{stop, {shutdown, Reason}, State}.
|
||||||
|
|
||||||
%% TODO: maybe_gc(State) -> State.
|
%% TODO: GC Policy and Shutdown Policy
|
||||||
|
%% maybe_gc(State) -> State.
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue,
|
||||||
|
|
||||||
t_in(_) ->
|
t_in(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
||||||
Q = ?Q:new(<<"testQ">>, Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{}, Q),
|
Q1 = ?Q:in(#message{}, Q),
|
||||||
?assertEqual(1, ?Q:len(Q1)),
|
?assertEqual(1, ?Q:len(Q1)),
|
||||||
|
@ -42,7 +42,7 @@ t_in(_) ->
|
||||||
|
|
||||||
t_in_qos0(_) ->
|
t_in_qos0(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => false},
|
Opts = #{type => simple, max_len => 5, store_qos0 => false},
|
||||||
Q = ?Q:new(<<"testQ">>, Opts),
|
Q = ?Q:init(Opts),
|
||||||
Q1 = ?Q:in(#message{qos = 0}, Q),
|
Q1 = ?Q:in(#message{qos = 0}, Q),
|
||||||
?assert(?Q:is_empty(Q1)),
|
?assert(?Q:is_empty(Q1)),
|
||||||
Q2 = ?Q:in(#message{qos = 0}, Q1),
|
Q2 = ?Q:in(#message{qos = 0}, Q1),
|
||||||
|
@ -50,7 +50,7 @@ t_in_qos0(_) ->
|
||||||
|
|
||||||
t_out(_) ->
|
t_out(_) ->
|
||||||
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
Opts = #{type => simple, max_len => 5, store_qos0 => true},
|
||||||
Q = ?Q:new(<<"testQ">>, Opts),
|
Q = ?Q:init(Opts),
|
||||||
{empty, Q} = ?Q:out(Q),
|
{empty, Q} = ?Q:out(Q),
|
||||||
Q1 = ?Q:in(#message{}, Q),
|
Q1 = ?Q:in(#message{}, Q),
|
||||||
{Value, Q2} = ?Q:out(Q1),
|
{Value, Q2} = ?Q:out(Q1),
|
||||||
|
@ -59,10 +59,9 @@ t_out(_) ->
|
||||||
|
|
||||||
t_simple_mqueue(_) ->
|
t_simple_mqueue(_) ->
|
||||||
Opts = #{type => simple, max_len => 3, store_qos0 => false},
|
Opts = #{type => simple, max_len => 3, store_qos0 => false},
|
||||||
Q = ?Q:new("simple_queue", Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(simple, ?Q:type(Q)),
|
?assertEqual(simple, ?Q:type(Q)),
|
||||||
?assertEqual(3, ?Q:max_len(Q)),
|
?assertEqual(3, ?Q:max_len(Q)),
|
||||||
?assertEqual(<<"simple_queue">>, ?Q:name(Q)),
|
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
|
Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
|
||||||
Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
|
Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
|
||||||
|
@ -75,7 +74,7 @@ t_simple_mqueue(_) ->
|
||||||
|
|
||||||
t_infinity_simple_mqueue(_) ->
|
t_infinity_simple_mqueue(_) ->
|
||||||
Opts = #{type => simple, max_len => 0, store_qos0 => false},
|
Opts = #{type => simple, max_len => 0, store_qos0 => false},
|
||||||
Q = ?Q:new("infinity_simple_queue", Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
?assertEqual(0, ?Q:max_len(Q)),
|
?assertEqual(0, ?Q:max_len(Q)),
|
||||||
Qx = lists:foldl(fun(I, AccQ) ->
|
Qx = lists:foldl(fun(I, AccQ) ->
|
||||||
|
@ -88,10 +87,9 @@ t_infinity_simple_mqueue(_) ->
|
||||||
|
|
||||||
t_priority_mqueue(_) ->
|
t_priority_mqueue(_) ->
|
||||||
Opts = #{type => priority, max_len => 3, store_qos0 => false},
|
Opts = #{type => priority, max_len => 3, store_qos0 => false},
|
||||||
Q = ?Q:new("priority_queue", Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(priority, ?Q:type(Q)),
|
?assertEqual(priority, ?Q:type(Q)),
|
||||||
?assertEqual(3, ?Q:max_len(Q)),
|
?assertEqual(3, ?Q:max_len(Q)),
|
||||||
?assertEqual(<<"priority_queue">>, ?Q:name(Q)),
|
|
||||||
?assert(?Q:is_empty(Q)),
|
?assert(?Q:is_empty(Q)),
|
||||||
Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
|
Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
|
||||||
Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
|
Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
|
||||||
|
@ -109,7 +107,7 @@ t_priority_mqueue(_) ->
|
||||||
|
|
||||||
t_infinity_priority_mqueue(_) ->
|
t_infinity_priority_mqueue(_) ->
|
||||||
Opts = #{type => priority, max_len => 0, store_qos0 => false},
|
Opts = #{type => priority, max_len => 0, store_qos0 => false},
|
||||||
Q = ?Q:new("infinity_priority_queue", Opts),
|
Q = ?Q:init(Opts),
|
||||||
?assertEqual(0, ?Q:max_len(Q)),
|
?assertEqual(0, ?Q:max_len(Q)),
|
||||||
Qx = lists:foldl(fun(I, AccQ) ->
|
Qx = lists:foldl(fun(I, AccQ) ->
|
||||||
AccQ1 =
|
AccQ1 =
|
||||||
|
@ -118,3 +116,16 @@ t_infinity_priority_mqueue(_) ->
|
||||||
end, Q, lists:seq(1, 255)),
|
end, Q, lists:seq(1, 255)),
|
||||||
?assertEqual(510, ?Q:len(Qx)),
|
?assertEqual(510, ?Q:len(Qx)),
|
||||||
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
|
||||||
|
|
||||||
|
t_priority_mqueue2(_) ->
|
||||||
|
Opts = #{type => priority, max_length => 2, store_qos0 => false},
|
||||||
|
Q = ?Q:init("priority_queue2_test", Opts),
|
||||||
|
2 = ?Q:max_len(Q),
|
||||||
|
Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
|
||||||
|
Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
|
||||||
|
Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
|
||||||
|
Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
|
||||||
|
4 = ?Q:len(Q4),
|
||||||
|
{{value, _Val}, Q5} = ?Q:out(Q4),
|
||||||
|
3 = ?Q:len(Q5).
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue