Compare commits
6 Commits
master
...
extend-per
Author | SHA1 | Date |
---|---|---|
![]() |
31cb1179af | |
![]() |
1c274b15d2 | |
![]() |
6497dc30b7 | |
![]() |
5d4645b774 | |
![]() |
008eae5a8e | |
![]() |
6e05ba17b0 |
14
Makefile
14
Makefile
|
@ -15,6 +15,8 @@ REL_PROFILES := emqx emqx-edge
|
||||||
PKG_PROFILES := emqx-pkg emqx-edge-pkg
|
PKG_PROFILES := emqx-pkg emqx-edge-pkg
|
||||||
PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
|
PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default
|
||||||
|
|
||||||
|
CT_NODE_NAME ?= 'test@127.0.0.1'
|
||||||
|
|
||||||
export REBAR_GIT_CLONE_OPTIONS += --depth=1
|
export REBAR_GIT_CLONE_OPTIONS += --depth=1
|
||||||
|
|
||||||
.PHONY: default
|
.PHONY: default
|
||||||
|
@ -44,7 +46,7 @@ proper: $(REBAR)
|
||||||
|
|
||||||
.PHONY: ct
|
.PHONY: ct
|
||||||
ct: $(REBAR)
|
ct: $(REBAR)
|
||||||
@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name 'test@127.0.0.1' -c -v
|
@ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v
|
||||||
|
|
||||||
APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
|
APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
|
||||||
|
|
||||||
|
@ -52,7 +54,7 @@ APPS=$(shell $(CURDIR)/scripts/find-apps.sh)
|
||||||
.PHONY: $(APPS:%=%-ct)
|
.PHONY: $(APPS:%=%-ct)
|
||||||
define gen-app-ct-target
|
define gen-app-ct-target
|
||||||
$1-ct:
|
$1-ct:
|
||||||
$(REBAR) ct --name 'test@127.0.0.1' -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
|
$(REBAR) ct --name $(CT_NODE_NAME) -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1)
|
||||||
endef
|
endef
|
||||||
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
|
$(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app))))
|
||||||
|
|
||||||
|
@ -64,6 +66,14 @@ $1-prop:
|
||||||
endef
|
endef
|
||||||
$(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
|
$(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
|
||||||
|
|
||||||
|
.PHONY: ct-suite
|
||||||
|
ct-suite: $(REBAR)
|
||||||
|
ifneq ($(TESTCASE),)
|
||||||
|
$(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE)
|
||||||
|
else
|
||||||
|
$(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE)
|
||||||
|
endif
|
||||||
|
|
||||||
.PHONY: cover
|
.PHONY: cover
|
||||||
cover: $(REBAR)
|
cover: $(REBAR)
|
||||||
@ENABLE_COVER_COMPILE=1 $(REBAR) cover
|
@ENABLE_COVER_COMPILE=1 $(REBAR) cover
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
-define(COMMON_SHARD, emqx_common_shard).
|
-define(COMMON_SHARD, emqx_common_shard).
|
||||||
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
|
||||||
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
|
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
|
||||||
|
-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Banner
|
%% Banner
|
||||||
|
@ -89,7 +90,7 @@
|
||||||
|
|
||||||
-record(route, {
|
-record(route, {
|
||||||
topic :: binary(),
|
topic :: binary(),
|
||||||
dest :: node() | {binary(), node()}
|
dest :: node() | {binary(), node()} | binary()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -38,6 +38,7 @@
|
||||||
, ?SHARED_SUB_SHARD
|
, ?SHARED_SUB_SHARD
|
||||||
, ?RULE_ENGINE_SHARD
|
, ?RULE_ENGINE_SHARD
|
||||||
, ?MOD_DELAYED_SHARD
|
, ?MOD_DELAYED_SHARD
|
||||||
|
, ?PERSISTENT_SESSION_SHARD
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -205,6 +205,7 @@ publish(Msg) when is_record(Msg, message) ->
|
||||||
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
|
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
|
||||||
[];
|
[];
|
||||||
Msg1 = #message{topic = Topic} ->
|
Msg1 = #message{topic = Topic} ->
|
||||||
|
emqx_session_router:persist(Msg1),
|
||||||
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -183,7 +183,11 @@ set_conn_state(ConnState, Channel) ->
|
||||||
get_session(#channel{session = Session}) ->
|
get_session(#channel{session = Session}) ->
|
||||||
Session.
|
Session.
|
||||||
|
|
||||||
set_session(Session, Channel) ->
|
set_session(Session, Channel = #channel{clientinfo = ClientInfo, conninfo = ConnInfo}) ->
|
||||||
|
%% Assume that this is also an updated session. Allow side effect.
|
||||||
|
ClientID = maps:get(clientid, ClientInfo, undefined),
|
||||||
|
ExpiryInterval = maps:get(expiry_interval, ConnInfo, 0),
|
||||||
|
emqx_session:db_put(ClientID, ExpiryInterval, Session),
|
||||||
Channel#channel{session = Session}.
|
Channel#channel{session = Session}.
|
||||||
|
|
||||||
%% TODO: Add more stats.
|
%% TODO: Add more stats.
|
||||||
|
@ -367,10 +371,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
case emqx_session:puback(PacketId, Session) of
|
case emqx_session:puback(PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, set_session(NSession, Channel)};
|
||||||
{ok, Msg, Publishes, NSession} ->
|
{ok, Msg, Publishes, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
handle_out(publish, Publishes, set_session(NSession, Channel));
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
?LOG(warning, "The PUBACK PacketId ~w is inuse.", [PacketId]),
|
||||||
ok = emqx_metrics:inc('packets.puback.inuse'),
|
ok = emqx_metrics:inc('packets.puback.inuse'),
|
||||||
|
@ -386,7 +390,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
case emqx_session:pubrec(PacketId, Session) of
|
case emqx_session:pubrec(PacketId, Session) of
|
||||||
{ok, Msg, NSession} ->
|
{ok, Msg, NSession} ->
|
||||||
ok = after_message_acked(ClientInfo, Msg, Properties),
|
ok = after_message_acked(ClientInfo, Msg, Properties),
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
|
?LOG(warning, "The PUBREC PacketId ~w is inuse.", [PacketId]),
|
||||||
|
@ -401,7 +405,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel
|
||||||
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:pubrel(PacketId, Session) of
|
case emqx_session:pubrel(PacketId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
|
||||||
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
|
?LOG(warning, "The PUBREL PacketId ~w is not found.", [PacketId]),
|
||||||
|
@ -412,9 +416,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se
|
||||||
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:pubcomp(PacketId, Session) of
|
case emqx_session:pubcomp(PacketId, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}};
|
{ok, set_session(NSession, Channel)};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
handle_out(publish, Publishes, Channel#channel{session = NSession});
|
handle_out(publish, Publishes, set_session(NSession, Channel));
|
||||||
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
ok = emqx_metrics:inc('packets.pubcomp.inuse'),
|
||||||
{ok, Channel};
|
{ok, Channel};
|
||||||
|
@ -614,7 +618,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
|
||||||
case emqx_session:publish(PacketId, Msg, Session) of
|
case emqx_session:publish(PacketId, Msg, Session) of
|
||||||
{ok, PubRes, NSession} ->
|
{ok, PubRes, NSession} ->
|
||||||
RC = puback_reason_code(PubRes),
|
RC = puback_reason_code(PubRes),
|
||||||
NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}),
|
NChannel0 = set_session(NSession, Channel),
|
||||||
|
NChannel1 = ensure_timer(await_timer, NChannel0),
|
||||||
NChannel2 = ensure_quota(PubRes, NChannel1),
|
NChannel2 = ensure_quota(PubRes, NChannel1),
|
||||||
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
handle_out(pubrec, {PacketId, RC}, NChannel2);
|
||||||
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
{error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} ->
|
||||||
|
@ -683,7 +688,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
|
||||||
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel),
|
||||||
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{QoS, Channel#channel{session = NSession}};
|
{QoS, set_session(NSession, Channel)};
|
||||||
{error, RC} ->
|
{error, RC} ->
|
||||||
?LOG(warning, "Cannot subscribe ~s due to ~s.",
|
?LOG(warning, "Cannot subscribe ~s due to ~s.",
|
||||||
[TopicFilter, emqx_reason_codes:text(RC)]),
|
[TopicFilter, emqx_reason_codes:text(RC)]),
|
||||||
|
@ -711,7 +716,7 @@ do_unsubscribe(TopicFilter, SubOpts, Channel =
|
||||||
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter),
|
||||||
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{?RC_SUCCESS, Channel#channel{session = NSession}};
|
{?RC_SUCCESS, set_session(NSession, Channel)};
|
||||||
{error, RC} -> {RC, Channel}
|
{error, RC} -> {RC, Channel}
|
||||||
end.
|
end.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -736,7 +741,9 @@ process_disconnect(ReasonCode, Properties, Channel) ->
|
||||||
|
|
||||||
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
|
maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval},
|
||||||
Channel = #channel{conninfo = ConnInfo}) ->
|
Channel = #channel{conninfo = ConnInfo}) ->
|
||||||
Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}};
|
NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}},
|
||||||
|
%% We need to update the expiry interval on the session as well
|
||||||
|
set_session(NChannel#channel.session, NChannel);
|
||||||
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -748,8 +755,14 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel.
|
||||||
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
handle_deliver(Delivers, Channel = #channel{conn_state = disconnected,
|
||||||
session = Session,
|
session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session),
|
Delivers1 = maybe_nack(Delivers),
|
||||||
{ok, Channel#channel{session = NSession}};
|
Delivers2 = ignore_local(Delivers1, ClientId, Session),
|
||||||
|
NSession = emqx_session:enqueue(Delivers2, Session),
|
||||||
|
NChannel = set_session(NSession, Channel),
|
||||||
|
%% We consider queued/dropped messages as delivered since they are now in the session state.
|
||||||
|
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers],
|
||||||
|
emqx_session_router:delivered(ClientId, MsgIds),
|
||||||
|
{ok, NChannel};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
handle_deliver(Delivers, Channel = #channel{takeover = true,
|
||||||
pendings = Pendings,
|
pendings = Pendings,
|
||||||
|
@ -759,13 +772,22 @@ handle_deliver(Delivers, Channel = #channel{takeover = true,
|
||||||
{ok, Channel#channel{pendings = NPendings}};
|
{ok, Channel#channel{pendings = NPendings}};
|
||||||
|
|
||||||
handle_deliver(Delivers, Channel = #channel{session = Session,
|
handle_deliver(Delivers, Channel = #channel{session = Session,
|
||||||
clientinfo = #{clientid := ClientId}}) ->
|
clientinfo = #{clientid := ClientId},
|
||||||
|
conninfo = #{expiry_interval := ExpiryInterval}
|
||||||
|
}) ->
|
||||||
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
|
case ExpiryInterval > 0 of
|
||||||
|
true ->
|
||||||
|
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Delivers],
|
||||||
|
emqx_session_router:delivered(ClientId, MsgIds);
|
||||||
|
false ->
|
||||||
|
ignore
|
||||||
|
end,
|
||||||
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel));
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, Channel#channel{session = NSession}}
|
{ok, set_session(NSession, Channel)}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
ignore_local(Delivers, Subscriber, Session) ->
|
ignore_local(Delivers, Subscriber, Session) ->
|
||||||
|
@ -881,13 +903,13 @@ return_connack(AckPacket, Channel) ->
|
||||||
case maybe_resume_session(Channel) of
|
case maybe_resume_session(Channel) of
|
||||||
ignore -> {ok, Replies, Channel};
|
ignore -> {ok, Replies, Channel};
|
||||||
{ok, Publishes, NSession} ->
|
{ok, Publishes, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession,
|
NChannel0 = Channel#channel{resuming = false,
|
||||||
resuming = false,
|
|
||||||
pendings = []
|
pendings = []
|
||||||
},
|
},
|
||||||
{Packets, NChannel1} = do_deliver(Publishes, NChannel),
|
NChannel1 = set_session(NSession, NChannel0),
|
||||||
|
{Packets, NChannel2} = do_deliver(Publishes, NChannel1),
|
||||||
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
Outgoing = [{outgoing, Packets} || length(Packets) > 0],
|
||||||
{ok, Replies ++ Outgoing, NChannel1}
|
{ok, Replies ++ Outgoing, NChannel2}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -1047,9 +1069,9 @@ handle_timeout(_TRef, retry_delivery,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:retry(Session) of
|
case emqx_session:retry(Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(retry_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(retry_timer, set_session(NSession, Channel))};
|
||||||
{ok, Publishes, Timeout, NSession} ->
|
{ok, Publishes, Timeout, NSession} ->
|
||||||
NChannel = Channel#channel{session = NSession},
|
NChannel = set_session(NSession, Channel),
|
||||||
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel))
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
@ -1060,9 +1082,9 @@ handle_timeout(_TRef, expire_awaiting_rel,
|
||||||
Channel = #channel{session = Session}) ->
|
Channel = #channel{session = Session}) ->
|
||||||
case emqx_session:expire(awaiting_rel, Session) of
|
case emqx_session:expire(awaiting_rel, Session) of
|
||||||
{ok, NSession} ->
|
{ok, NSession} ->
|
||||||
{ok, clean_timer(await_timer, Channel#channel{session = NSession})};
|
{ok, clean_timer(await_timer, set_session(NSession, Channel))};
|
||||||
{ok, Timeout, NSession} ->
|
{ok, Timeout, NSession} ->
|
||||||
{ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})}
|
{ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_timeout(_TRef, expire_session, Channel) ->
|
handle_timeout(_TRef, expire_session, Channel) ->
|
||||||
|
@ -1598,8 +1620,12 @@ maybe_resume_session(#channel{resuming = false}) ->
|
||||||
ignore;
|
ignore;
|
||||||
maybe_resume_session(#channel{session = Session,
|
maybe_resume_session(#channel{session = Session,
|
||||||
resuming = true,
|
resuming = true,
|
||||||
pendings = Pendings}) ->
|
pendings = Pendings,
|
||||||
|
clientinfo = #{clientid := ClientId}}) ->
|
||||||
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
{ok, Publishes, Session1} = emqx_session:replay(Session),
|
||||||
|
%% We consider queued/dropped messages as delivered since they are now in the session state.
|
||||||
|
MsgIds = [emqx_message:id(Msg) || {deliver, _, Msg} <- Pendings],
|
||||||
|
emqx_session_router:delivered(ClientId, MsgIds),
|
||||||
case emqx_session:deliver(Pendings, Session1) of
|
case emqx_session:deliver(Pendings, Session1) of
|
||||||
{ok, Session2} ->
|
{ok, Session2} ->
|
||||||
{ok, Publishes, Session2};
|
{ok, Publishes, Session2};
|
||||||
|
|
|
@ -211,21 +211,34 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
|
||||||
pendings => list()}}
|
pendings => list()}}
|
||||||
| {error, Reason :: term()}).
|
| {error, Reason :: term()}).
|
||||||
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
|
EI = maps:get(expiry_interval, ConnInfo, 0),
|
||||||
Self = self(),
|
Self = self(),
|
||||||
CleanStart = fun(_) ->
|
CleanStart = fun(_) ->
|
||||||
ok = discard_session(ClientId),
|
ok = discard_session(ClientId),
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
|
emqx_session:db_put(ClientId, EI, Session),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session, present => false}}
|
{ok, #{session => Session, present => false}}
|
||||||
end,
|
end,
|
||||||
emqx_cm_locker:trans(ClientId, CleanStart);
|
emqx_cm_locker:trans(ClientId, CleanStart);
|
||||||
|
|
||||||
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
|
EI = maps:get(expiry_interval, ConnInfo, 0),
|
||||||
Self = self(),
|
Self = self(),
|
||||||
ResumeStart = fun(_) ->
|
ResumeStart = fun(_) ->
|
||||||
case takeover_session(ClientId) of
|
case takeover_session(ClientId) of
|
||||||
|
{ok, Session} ->
|
||||||
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
|
emqx_session:db_put(ClientId, EI, Session),
|
||||||
|
Pendings = [{deliver, emqx_message:topic(Msg), Msg}
|
||||||
|
|| Msg <- emqx_session_router:pending(ClientId)],
|
||||||
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
|
{ok, #{session => Session,
|
||||||
|
present => true,
|
||||||
|
pendings => Pendings}};
|
||||||
{ok, ConnMod, ChanPid, Session} ->
|
{ok, ConnMod, ChanPid, Session} ->
|
||||||
ok = emqx_session:resume(ClientInfo, Session),
|
ok = emqx_session:resume(ClientInfo, Session),
|
||||||
|
emqx_session:db_put(ClientId, EI, Session),
|
||||||
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session,
|
{ok, #{session => Session,
|
||||||
|
@ -233,6 +246,7 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
|
||||||
pendings => Pendings}};
|
pendings => Pendings}};
|
||||||
{error, not_found} ->
|
{error, not_found} ->
|
||||||
Session = create_session(ClientInfo, ConnInfo),
|
Session = create_session(ClientInfo, ConnInfo),
|
||||||
|
emqx_session:db_put(ClientId, EI, Session),
|
||||||
register_channel(ClientId, Self, ConnInfo),
|
register_channel(ClientId, Self, ConnInfo),
|
||||||
{ok, #{session => Session, present => false}}
|
{ok, #{session => Session, present => false}}
|
||||||
end
|
end
|
||||||
|
@ -271,7 +285,11 @@ get_mqtt_conf(Zone, Key) ->
|
||||||
| {ok, atom(), pid(), emqx_session:session()}).
|
| {ok, atom(), pid(), emqx_session:session()}).
|
||||||
takeover_session(ClientId) ->
|
takeover_session(ClientId) ->
|
||||||
case lookup_channels(ClientId) of
|
case lookup_channels(ClientId) of
|
||||||
[] -> {error, not_found};
|
[] ->
|
||||||
|
case emqx_session:db_get(ClientId) of
|
||||||
|
[] -> {error, not_found};
|
||||||
|
[Session] -> {ok, Session}
|
||||||
|
end;
|
||||||
[ChanPid] ->
|
[ChanPid] ->
|
||||||
takeover_session(ClientId, ChanPid);
|
takeover_session(ClientId, ChanPid);
|
||||||
ChanPids ->
|
ChanPids ->
|
||||||
|
@ -286,7 +304,10 @@ takeover_session(ClientId) ->
|
||||||
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
|
||||||
case get_chann_conn_mod(ClientId, ChanPid) of
|
case get_chann_conn_mod(ClientId, ChanPid) of
|
||||||
undefined ->
|
undefined ->
|
||||||
{error, not_found};
|
case emqx_session:db_get(ClientId) of
|
||||||
|
[] -> {error, not_found};
|
||||||
|
[Session] -> {ok, Session}
|
||||||
|
end;
|
||||||
ConnMod when is_atom(ConnMod) ->
|
ConnMod when is_atom(ConnMod) ->
|
||||||
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
|
||||||
{ok, ConnMod, ChanPid, Session}
|
{ok, ConnMod, ChanPid, Session}
|
||||||
|
|
|
@ -119,8 +119,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true ->
|
true ->
|
||||||
maybe_trans(fun insert_trie_route/1, [Route]);
|
Fun = fun emqx_router_utils:insert_trie_route/2,
|
||||||
false -> insert_direct_route(Route)
|
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
|
||||||
|
false ->
|
||||||
|
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -165,8 +167,10 @@ do_delete_route(Topic, Dest) ->
|
||||||
Route = #route{topic = Topic, dest = Dest},
|
Route = #route{topic = Topic, dest = Dest},
|
||||||
case emqx_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true ->
|
true ->
|
||||||
maybe_trans(fun delete_trie_route/1, [Route]);
|
Fun = fun emqx_router_utils:delete_trie_route/2,
|
||||||
false -> delete_direct_route(Route)
|
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD);
|
||||||
|
false ->
|
||||||
|
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(topics() -> list(emqx_topic:topic())).
|
-spec(topics() -> list(emqx_topic:topic())).
|
||||||
|
@ -219,100 +223,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
{ok, State}.
|
{ok, State}.
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Internal functions
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
insert_direct_route(Route) ->
|
|
||||||
ekka_mnesia:dirty_write(?ROUTE_TAB, Route).
|
|
||||||
|
|
||||||
insert_trie_route(Route = #route{topic = Topic}) ->
|
|
||||||
case mnesia:wread({?ROUTE_TAB, Topic}) of
|
|
||||||
[] -> emqx_trie:insert(Topic);
|
|
||||||
_ -> ok
|
|
||||||
end,
|
|
||||||
mnesia:write(?ROUTE_TAB, Route, sticky_write).
|
|
||||||
|
|
||||||
delete_direct_route(Route) ->
|
|
||||||
ekka_mnesia:dirty_delete_object(?ROUTE_TAB, Route).
|
|
||||||
|
|
||||||
delete_trie_route(Route = #route{topic = Topic}) ->
|
|
||||||
case mnesia:wread({?ROUTE_TAB, Topic}) of
|
|
||||||
[Route] -> %% Remove route and trie
|
|
||||||
ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write),
|
|
||||||
emqx_trie:delete(Topic);
|
|
||||||
[_|_] -> %% Remove route only
|
|
||||||
mnesia:delete_object(?ROUTE_TAB, Route, sticky_write);
|
|
||||||
[] -> ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
|
||||||
-spec(maybe_trans(function(), list(any())) -> ok | {error, term()}).
|
|
||||||
maybe_trans(Fun, Args) ->
|
|
||||||
case emqx_config:get([broker, perf, route_lock_type]) of
|
|
||||||
key ->
|
|
||||||
trans(Fun, Args);
|
|
||||||
global ->
|
|
||||||
%% Assert:
|
|
||||||
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
|
|
||||||
lock_router(),
|
|
||||||
try mnesia:sync_dirty(Fun, Args)
|
|
||||||
after
|
|
||||||
unlock_router()
|
|
||||||
end;
|
|
||||||
tab ->
|
|
||||||
trans(fun() ->
|
|
||||||
emqx_trie:lock_tables(),
|
|
||||||
apply(Fun, Args)
|
|
||||||
end, [])
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% The created fun only terminates with explicit exception
|
|
||||||
-dialyzer({nowarn_function, [trans/2]}).
|
|
||||||
|
|
||||||
-spec(trans(function(), list(any())) -> ok | {error, term()}).
|
|
||||||
trans(Fun, Args) ->
|
|
||||||
{WPid, RefMon} =
|
|
||||||
spawn_monitor(
|
|
||||||
%% NOTE: this is under the assumption that crashes in Fun
|
|
||||||
%% are caught by mnesia:transaction/2.
|
|
||||||
%% Future changes should keep in mind that this process
|
|
||||||
%% always exit with database write result.
|
|
||||||
fun() ->
|
|
||||||
Res = case ekka_mnesia:transaction(?ROUTE_SHARD, Fun, Args) of
|
|
||||||
{atomic, Ok} -> Ok;
|
|
||||||
{aborted, Reason} -> {error, Reason}
|
|
||||||
end,
|
|
||||||
exit({shutdown, Res})
|
|
||||||
end),
|
|
||||||
%% Receive a 'shutdown' exit to pass result from the short-lived process.
|
|
||||||
%% so the receive below can be receive-mark optimized by the compiler.
|
|
||||||
%%
|
|
||||||
%% If the result is sent as a regular message, we'll have to
|
|
||||||
%% either demonitor (with flush which is essentially a 'receive' since
|
|
||||||
%% the process is no longer alive after the result has been received),
|
|
||||||
%% or use a plain 'receive' to drain the normal 'DOWN' message.
|
|
||||||
%% However the compiler does not optimize this second 'receive'.
|
|
||||||
receive
|
|
||||||
{'DOWN', RefMon, process, WPid, Info} ->
|
|
||||||
case Info of
|
|
||||||
{shutdown, Result} -> Result;
|
|
||||||
_ -> {error, {trans_crash, Info}}
|
|
||||||
end
|
|
||||||
end.
|
|
||||||
|
|
||||||
lock_router() ->
|
|
||||||
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
|
||||||
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
|
||||||
case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of
|
|
||||||
false ->
|
|
||||||
%% Force to sleep 1ms instead.
|
|
||||||
timer:sleep(1),
|
|
||||||
lock_router();
|
|
||||||
true ->
|
|
||||||
ok
|
|
||||||
end.
|
|
||||||
|
|
||||||
unlock_router() ->
|
|
||||||
global:del_lock({?MODULE, self()}).
|
|
||||||
|
|
|
@ -34,7 +34,13 @@ init([]) ->
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => [emqx_router_helper]},
|
modules => [emqx_router_helper]},
|
||||||
%% Router pool
|
%% Router pool
|
||||||
RouterPool = emqx_pool_sup:spec([router_pool, hash,
|
RouterPool = emqx_pool_sup:spec(router_pool,
|
||||||
|
[router_pool, hash,
|
||||||
{emqx_router, start_link, []}]),
|
{emqx_router, start_link, []}]),
|
||||||
{ok, {{one_for_all, 0, 1}, [Helper, RouterPool]}}.
|
|
||||||
|
%% TODO: Should this be optional?
|
||||||
|
SessionRouterPool = emqx_pool_sup:spec(session_router_pool,
|
||||||
|
[session_router_pool, hash,
|
||||||
|
{emqx_session_router, start_link, []}]),
|
||||||
|
{ok, {{one_for_all, 0, 1}, [Helper, RouterPool, SessionRouterPool]}}.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_router_utils).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
|
||||||
|
-export([ delete_direct_route/2
|
||||||
|
, delete_trie_route/2
|
||||||
|
, insert_direct_route/2
|
||||||
|
, insert_trie_route/2
|
||||||
|
, maybe_trans/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
insert_direct_route(Tab, Route) ->
|
||||||
|
ekka_mnesia:dirty_write(Tab, Route).
|
||||||
|
|
||||||
|
insert_trie_route(RouteTab, Route = #route{topic = Topic}) ->
|
||||||
|
case mnesia:wread({RouteTab, Topic}) of
|
||||||
|
[] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic);
|
||||||
|
[] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic);
|
||||||
|
_ -> ok
|
||||||
|
end,
|
||||||
|
mnesia:write(RouteTab, Route, sticky_write).
|
||||||
|
|
||||||
|
delete_direct_route(RouteTab, Route) ->
|
||||||
|
ekka_mnesia:dirty_delete_object(RouteTab, Route).
|
||||||
|
|
||||||
|
delete_trie_route(RouteTab, Route = #route{topic = Topic}) ->
|
||||||
|
case mnesia:wread({RouteTab, Topic}) of
|
||||||
|
[R] when R =:= Route ->
|
||||||
|
%% Remove route and trie
|
||||||
|
ok = mnesia:delete_object(RouteTab, Route, sticky_write),
|
||||||
|
case RouteTab of
|
||||||
|
emqx_route -> emqx_trie:delete(Topic);
|
||||||
|
emqx_session_route -> emqx_trie:delete_session(Topic)
|
||||||
|
end;
|
||||||
|
[_|_] ->
|
||||||
|
%% Remove route only
|
||||||
|
mnesia:delete_object(RouteTab, Route, sticky_write);
|
||||||
|
[] ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}).
|
||||||
|
maybe_trans(Fun, Args, Shard) ->
|
||||||
|
case emqx_config:get([broker, perf, route_lock_type]) of
|
||||||
|
key ->
|
||||||
|
trans(Fun, Args, Shard);
|
||||||
|
global ->
|
||||||
|
%% Assert:
|
||||||
|
mnesia = ekka_rlog:backend(), %% TODO: do something smarter than just crash
|
||||||
|
lock_router(Shard),
|
||||||
|
try mnesia:sync_dirty(Fun, Args)
|
||||||
|
after
|
||||||
|
unlock_router(Shard)
|
||||||
|
end;
|
||||||
|
tab ->
|
||||||
|
trans(fun() ->
|
||||||
|
emqx_trie:lock_tables(),
|
||||||
|
apply(Fun, Args)
|
||||||
|
end, [], Shard)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% The created fun only terminates with explicit exception
|
||||||
|
-dialyzer({nowarn_function, [trans/3]}).
|
||||||
|
|
||||||
|
-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}).
|
||||||
|
trans(Fun, Args, Shard) ->
|
||||||
|
{WPid, RefMon} =
|
||||||
|
spawn_monitor(
|
||||||
|
%% NOTE: this is under the assumption that crashes in Fun
|
||||||
|
%% are caught by mnesia:transaction/2.
|
||||||
|
%% Future changes should keep in mind that this process
|
||||||
|
%% always exit with database write result.
|
||||||
|
fun() ->
|
||||||
|
Res = case ekka_mnesia:transaction(Shard, Fun, Args) of
|
||||||
|
{atomic, Ok} -> Ok;
|
||||||
|
{aborted, Reason} -> {error, Reason}
|
||||||
|
end,
|
||||||
|
exit({shutdown, Res})
|
||||||
|
end),
|
||||||
|
%% Receive a 'shutdown' exit to pass result from the short-lived process.
|
||||||
|
%% so the receive below can be receive-mark optimized by the compiler.
|
||||||
|
%%
|
||||||
|
%% If the result is sent as a regular message, we'll have to
|
||||||
|
%% either demonitor (with flush which is essentially a 'receive' since
|
||||||
|
%% the process is no longer alive after the result has been received),
|
||||||
|
%% or use a plain 'receive' to drain the normal 'DOWN' message.
|
||||||
|
%% However the compiler does not optimize this second 'receive'.
|
||||||
|
receive
|
||||||
|
{'DOWN', RefMon, process, WPid, Info} ->
|
||||||
|
case Info of
|
||||||
|
{shutdown, Result} -> Result;
|
||||||
|
_ -> {error, {trans_crash, Info}}
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
lock_router(Shard) ->
|
||||||
|
%% if Retry is not 0, global:set_lock could sleep a random time up to 8s.
|
||||||
|
%% Considering we have a limited number of brokers, it is safe to use sleep 1 ms.
|
||||||
|
case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of
|
||||||
|
false ->
|
||||||
|
%% Force to sleep 1ms instead.
|
||||||
|
timer:sleep(1),
|
||||||
|
lock_router(Shard);
|
||||||
|
true ->
|
||||||
|
ok
|
||||||
|
end.
|
||||||
|
|
||||||
|
unlock_router(Shard) ->
|
||||||
|
global:del_lock({{?MODULE, Shard}, self()}).
|
|
@ -54,6 +54,15 @@
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
|
%% DB API
|
||||||
|
-export([ mnesia/1
|
||||||
|
, db_get/1
|
||||||
|
, db_put/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
-export([init/1]).
|
-export([init/1]).
|
||||||
|
|
||||||
-export([ info/1
|
-export([ info/1
|
||||||
|
@ -159,6 +168,27 @@
|
||||||
, mqueue => emqx_mqueue:options()
|
, mqueue => emqx_mqueue:options()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-define(SESSION_STORE, emqx_session_store).
|
||||||
|
-record(session_store, { id :: binary()
|
||||||
|
, expiry_interval :: non_neg_integer()
|
||||||
|
, ts :: non_neg_integer()
|
||||||
|
, session :: #session{}}).
|
||||||
|
|
||||||
|
mnesia(boot) ->
|
||||||
|
ok = ekka_mnesia:create_table(?SESSION_STORE, [
|
||||||
|
{type, set},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, session_store},
|
||||||
|
{attributes, record_info(fields, session_store)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true}]}]}]);
|
||||||
|
|
||||||
|
mnesia(copy) ->
|
||||||
|
ok = ekka_mnesia:copy_table(?SESSION_STORE, ram_copies).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Init a Session
|
%% Init a Session
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -184,6 +214,51 @@ init(Opts) ->
|
||||||
created_at = erlang:system_time(millisecond)
|
created_at = erlang:system_time(millisecond)
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% DB API
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
db_put(undefined,_ExpiryInterval, #session{}) ->
|
||||||
|
ok;
|
||||||
|
db_put(SessionID, ExpiryInterval, #session{} = Session) when is_binary(SessionID),
|
||||||
|
is_integer(ExpiryInterval) ->
|
||||||
|
SS = #session_store{ id = SessionID
|
||||||
|
, expiry_interval = ExpiryInterval
|
||||||
|
, ts = erlang:system_time(millisecond)
|
||||||
|
, session = Session},
|
||||||
|
case use_db_session(SS) of
|
||||||
|
false -> clean_up_session(SessionID, Session);
|
||||||
|
true ->
|
||||||
|
%% TODO: Should check for changes in the subscriptions.
|
||||||
|
maps:foreach(fun(Topic, _) ->
|
||||||
|
emqx_session_router:do_add_route(Topic, SessionID)
|
||||||
|
end, Session#session.subscriptions),
|
||||||
|
ekka_mnesia:dirty_write(?SESSION_STORE, SS)
|
||||||
|
end.
|
||||||
|
|
||||||
|
db_get(SessionID) when is_binary(SessionID) ->
|
||||||
|
case mnesia:dirty_read(?SESSION_STORE, SessionID) of
|
||||||
|
[] -> [];
|
||||||
|
[#session_store{session = S} = SS] ->
|
||||||
|
case use_db_session(SS) of
|
||||||
|
true -> [S];
|
||||||
|
false -> []
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @private [MQTT-3.1.2-23]
|
||||||
|
use_db_session(#session_store{expiry_interval = 0}) ->
|
||||||
|
false;
|
||||||
|
use_db_session(#session_store{expiry_interval = 16#FFFFFFFF}) ->
|
||||||
|
true;
|
||||||
|
use_db_session(#session_store{expiry_interval = E, ts = TS}) ->
|
||||||
|
E*1000 + TS > erlang:system_time(millisecond).
|
||||||
|
|
||||||
|
clean_up_session(SessionID, Session) ->
|
||||||
|
Fun = fun(Topic, _) -> emqx_session_router:do_delete_route(Topic, SessionID) end,
|
||||||
|
maps:foreach(Fun, Session#session.subscriptions),
|
||||||
|
ekka_mnesia:dirty_delete(?SESSION_STORE, SessionID).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Info, Stats
|
%% Info, Stats
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -272,6 +347,8 @@ unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions
|
||||||
case maps:find(TopicFilter, Subs) of
|
case maps:find(TopicFilter, Subs) of
|
||||||
{ok, SubOpts} ->
|
{ok, SubOpts} ->
|
||||||
ok = emqx_broker:unsubscribe(TopicFilter),
|
ok = emqx_broker:unsubscribe(TopicFilter),
|
||||||
|
ClientID = maps:get(clientid, ClientInfo, undefined),
|
||||||
|
emqx_session_router:do_delete_route(ClientID, TopicFilter),
|
||||||
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
|
ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]),
|
||||||
{ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
|
{ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}};
|
||||||
error ->
|
error ->
|
||||||
|
@ -637,7 +714,8 @@ terminate(ClientInfo, discarded, Session) ->
|
||||||
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
run_hook('session.discarded', [ClientInfo, info(Session)]);
|
||||||
terminate(ClientInfo, takeovered, Session) ->
|
terminate(ClientInfo, takeovered, Session) ->
|
||||||
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
run_hook('session.takeovered', [ClientInfo, info(Session)]);
|
||||||
terminate(ClientInfo, Reason, Session) ->
|
terminate(#{clientid := ClientId} = ClientInfo, Reason, Session) ->
|
||||||
|
clean_up_session(ClientId, Session),
|
||||||
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
run_hook('session.terminated', [ClientInfo, Reason, info(Session)]).
|
||||||
|
|
||||||
-compile({inline, [run_hook/2]}).
|
-compile({inline, [run_hook/2]}).
|
||||||
|
|
|
@ -0,0 +1,289 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2017-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_session_router).
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
-include("emqx.hrl").
|
||||||
|
-include("logger.hrl").
|
||||||
|
-include("types.hrl").
|
||||||
|
-include_lib("ekka/include/ekka.hrl").
|
||||||
|
|
||||||
|
-logger_header("[Router]").
|
||||||
|
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
-export([mnesia/1]).
|
||||||
|
|
||||||
|
-boot_mnesia({mnesia, [boot]}).
|
||||||
|
-copy_mnesia({mnesia, [copy]}).
|
||||||
|
|
||||||
|
-export([start_link/2]).
|
||||||
|
|
||||||
|
%% Route APIs
|
||||||
|
-export([ do_add_route/2
|
||||||
|
, do_delete_route/2
|
||||||
|
, match_routes/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([ persist/1
|
||||||
|
, delivered/2
|
||||||
|
, pending/1
|
||||||
|
]).
|
||||||
|
|
||||||
|
-export([print_routes/1]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([ init/1
|
||||||
|
, handle_call/3
|
||||||
|
, handle_cast/2
|
||||||
|
, handle_info/2
|
||||||
|
, terminate/2
|
||||||
|
, code_change/3
|
||||||
|
]).
|
||||||
|
|
||||||
|
-type(group() :: binary()).
|
||||||
|
|
||||||
|
-type(dest() :: node() | {group(), node()}).
|
||||||
|
|
||||||
|
-define(ROUTE_TAB, emqx_session_route).
|
||||||
|
-define(SESS_MSG_TAB, emqx_session_msg).
|
||||||
|
-define(MSG_TAB, emqx_persistent_msg).
|
||||||
|
|
||||||
|
%% NOTE: It is important that ?DELIVERED > ?UNDELIVERED because of traversal order
|
||||||
|
-define(DELIVERED, 1).
|
||||||
|
-define(UNDELIVERED, 0).
|
||||||
|
-type pending_tag() :: ?DELIVERED | ?UNDELIVERED.
|
||||||
|
-record(session_msg, {key :: {binary(), emqx_guid:guid(), pending_tag()},
|
||||||
|
val = [] :: []}).
|
||||||
|
|
||||||
|
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?ROUTE_TAB}).
|
||||||
|
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESS_MSG_TAB}).
|
||||||
|
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?MSG_TAB}).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Mnesia bootstrap
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
mnesia(boot) ->
|
||||||
|
ok = ekka_mnesia:create_table(?ROUTE_TAB, [
|
||||||
|
{type, bag},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, route},
|
||||||
|
{attributes, record_info(fields, route)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]),
|
||||||
|
ok = ekka_mnesia:create_table(?SESS_MSG_TAB, [
|
||||||
|
{type, ordered_set},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, session_msg},
|
||||||
|
{attributes, record_info(fields, session_msg)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]),
|
||||||
|
%% TODO: This should be external
|
||||||
|
ok = ekka_mnesia:create_table(?MSG_TAB, [
|
||||||
|
{type, set},
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, message},
|
||||||
|
{attributes, record_info(fields, message)},
|
||||||
|
{storage_properties, [{ets, [{read_concurrency, true},
|
||||||
|
{write_concurrency, true}]}]}]);
|
||||||
|
mnesia(copy) ->
|
||||||
|
ok = ekka_mnesia:copy_table(?ROUTE_TAB, ram_copies),
|
||||||
|
ok = ekka_mnesia:copy_table(?SESS_MSG_TAB, ram_copies),
|
||||||
|
%% TODO: This should be external
|
||||||
|
ok = ekka_mnesia:copy_table(?MSG_TAB, ram_copies).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Start a router
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
|
||||||
|
start_link(Pool, Id) ->
|
||||||
|
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
|
||||||
|
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Route APIs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||||
|
do_add_route(Topic, SessionID) when is_binary(Topic) ->
|
||||||
|
Route = #route{topic = Topic, dest = SessionID},
|
||||||
|
case lists:member(Route, lookup_routes(Topic)) of
|
||||||
|
true -> ok;
|
||||||
|
false ->
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
Fun = fun emqx_router_utils:insert_trie_route/2,
|
||||||
|
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route],
|
||||||
|
?PERSISTENT_SESSION_SHARD);
|
||||||
|
false ->
|
||||||
|
emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Match routes
|
||||||
|
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||||
|
match_routes(Topic) when is_binary(Topic) ->
|
||||||
|
case match_trie(Topic) of
|
||||||
|
[] -> lookup_routes(Topic);
|
||||||
|
Matched ->
|
||||||
|
lists:append([lookup_routes(To) || To <- [Topic | Matched]])
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% Optimize: routing table will be replicated to all router nodes.
|
||||||
|
match_trie(Topic) ->
|
||||||
|
case emqx_trie:empty_session() of
|
||||||
|
true -> [];
|
||||||
|
false -> emqx_trie:match_session(Topic)
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}).
|
||||||
|
do_delete_route(Topic, SessionID) ->
|
||||||
|
Route = #route{topic = Topic, dest = SessionID},
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true ->
|
||||||
|
Fun = fun emqx_router_utils:delete_trie_route/2,
|
||||||
|
emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD);
|
||||||
|
false ->
|
||||||
|
emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%% @doc Print routes to a topic
|
||||||
|
-spec(print_routes(emqx_topic:topic()) -> ok).
|
||||||
|
print_routes(Topic) ->
|
||||||
|
lists:foreach(fun(#route{topic = To, dest = SessionID}) ->
|
||||||
|
io:format("~s -> ~s~n", [To, SessionID])
|
||||||
|
end, match_routes(Topic)).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Message APIs
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
persist(Msg) ->
|
||||||
|
case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of
|
||||||
|
true -> ok;
|
||||||
|
false ->
|
||||||
|
case match_routes(emqx_message:topic(Msg)) of
|
||||||
|
[] -> ok;
|
||||||
|
Routes ->
|
||||||
|
%% TODO: This should store in external backend
|
||||||
|
ekka_mnesia:dirty_write(?MSG_TAB, Msg),
|
||||||
|
MsgId = emqx_message:id(Msg),
|
||||||
|
Fun = fun(#route{dest = SessionID}) ->
|
||||||
|
Key = {SessionID, MsgId, ?UNDELIVERED},
|
||||||
|
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
|
||||||
|
end,
|
||||||
|
lists:foreach(Fun, Routes)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
|
delivered(SessionID, MsgIDs) ->
|
||||||
|
Fun = fun(MsgID) ->
|
||||||
|
Key = {SessionID, MsgID, ?DELIVERED},
|
||||||
|
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
|
||||||
|
end,
|
||||||
|
lists:foreach(Fun, MsgIDs),
|
||||||
|
|
||||||
|
pending(SessionID) ->
|
||||||
|
call(pick(SessionID), {pending, SessionID}).
|
||||||
|
|
||||||
|
call(Router, Msg) ->
|
||||||
|
gen_server:call(Router, Msg, infinity).
|
||||||
|
|
||||||
|
cast(Router, Msg) ->
|
||||||
|
gen_server:cast(Router, Msg).
|
||||||
|
|
||||||
|
pick(#route{dest = SessionID}) ->
|
||||||
|
gproc_pool:pick_worker(session_router_pool, SessionID);
|
||||||
|
pick(SessionID) when is_binary(SessionID) ->
|
||||||
|
gproc_pool:pick_worker(session_router_pool, SessionID).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% gen_server callbacks
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
init([Pool, Id]) ->
|
||||||
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
|
{ok, #{pool => Pool, id => Id}}.
|
||||||
|
|
||||||
|
handle_call({pending, SessionID}, _From, State) ->
|
||||||
|
{reply, pending_messages(SessionID), State};
|
||||||
|
handle_call(Req, _From, State) ->
|
||||||
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
||||||
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast({delivered, SessionID, MsgIDs}, State) ->
|
||||||
|
Fun = fun(MsgID) ->
|
||||||
|
Key = {SessionID, MsgID, ?DELIVERED},
|
||||||
|
ekka_mnesia:dirty_write(?SESS_MSG_TAB, #session_msg{ key = Key })
|
||||||
|
end,
|
||||||
|
lists:foreach(Fun, MsgIDs),
|
||||||
|
{noreply, State};
|
||||||
|
handle_cast(Msg, State) ->
|
||||||
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
handle_info(Info, State) ->
|
||||||
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||||
|
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
||||||
|
|
||||||
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Internal functions
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
lookup_routes(Topic) ->
|
||||||
|
ets:lookup(?ROUTE_TAB, Topic).
|
||||||
|
|
||||||
|
pending_messages(SessionID) ->
|
||||||
|
%% TODO: The reading of messages should be from external DB
|
||||||
|
Fun = fun() -> [hd(mnesia:read(?MSG_TAB, MsgId))
|
||||||
|
|| MsgId <- pending_messages(SessionID, <<>>, ?DELIVERED, [])]
|
||||||
|
end,
|
||||||
|
{atomic, Msgs} = ekka_mnesia:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun),
|
||||||
|
Msgs.
|
||||||
|
|
||||||
|
%% The keys are ordered by
|
||||||
|
%% {sessionID(), emqx_guid:guid(), ?DELIVERED | ?UNDELIVERED}
|
||||||
|
%% where
|
||||||
|
%% emqx_guid:guid() is ordered in ts() and by node()
|
||||||
|
%% ?UNDELIVERED < ?DELIVERED
|
||||||
|
%%
|
||||||
|
%% We traverse the table until we reach another session.
|
||||||
|
%% TODO: Garbage collect the delivered messages.
|
||||||
|
pending_messages(SessionID, PrevMsgId, PrevTag, Acc) ->
|
||||||
|
case mnesia:dirty_next(?SESS_MSG_TAB, {SessionID, PrevMsgId, PrevTag}) of
|
||||||
|
{S, MsgId, Tag} = Key when S =:= SessionID, MsgId =:= PrevMsgId ->
|
||||||
|
Tag =:= ?UNDELIVERED andalso error({assert_fail}, Key),
|
||||||
|
pending_messages(SessionID, MsgId, Tag, Acc);
|
||||||
|
{S, MsgId, Tag} when S =:= SessionID ->
|
||||||
|
case PrevTag of
|
||||||
|
?DELIVERED -> pending_messages(SessionID, MsgId, Tag, Acc);
|
||||||
|
?UNDELIVERED -> pending_messages(SessionID, MsgId, Tag, [PrevMsgId|Acc])
|
||||||
|
end;
|
||||||
|
_ -> %% Next sessionID or '$end_of_table'
|
||||||
|
case PrevTag of
|
||||||
|
?DELIVERED -> lists:reverse(Acc);
|
||||||
|
?UNDELIVERED -> lists:reverse([PrevMsgId|Acc])
|
||||||
|
end
|
||||||
|
end.
|
|
@ -26,12 +26,17 @@
|
||||||
|
|
||||||
%% Trie APIs
|
%% Trie APIs
|
||||||
-export([ insert/1
|
-export([ insert/1
|
||||||
|
, insert_session/1
|
||||||
, match/1
|
, match/1
|
||||||
|
, match_session/1
|
||||||
, delete/1
|
, delete/1
|
||||||
|
, delete_session/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([ empty/0
|
-export([ empty/0
|
||||||
|
, empty_session/0
|
||||||
, lock_tables/0
|
, lock_tables/0
|
||||||
|
, lock_session_tables/0
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-export([is_compact/0, set_compact/1]).
|
-export([is_compact/0, set_compact/1]).
|
||||||
|
@ -42,6 +47,7 @@
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-define(TRIE, emqx_trie).
|
-define(TRIE, emqx_trie).
|
||||||
|
-define(SESSION_TRIE, emqx_session_trie).
|
||||||
-define(PREFIX(Prefix), {Prefix, 0}).
|
-define(PREFIX(Prefix), {Prefix, 0}).
|
||||||
-define(TOPIC(Topic), {Topic, 1}).
|
-define(TOPIC(Topic), {Topic, 1}).
|
||||||
|
|
||||||
|
@ -51,6 +57,7 @@
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
|
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
|
||||||
|
-rlog_shard({?PERSISTENT_SESSION_SHARD, ?SESSION_TRIE}).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
|
@ -64,6 +71,12 @@ mnesia(boot) ->
|
||||||
{write_concurrency, true}
|
{write_concurrency, true}
|
||||||
]}],
|
]}],
|
||||||
ok = ekka_mnesia:create_table(?TRIE, [
|
ok = ekka_mnesia:create_table(?TRIE, [
|
||||||
|
{ram_copies, [node()]},
|
||||||
|
{record_name, ?TRIE},
|
||||||
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
|
{type, ordered_set},
|
||||||
|
{storage_properties, StoreProps}]),
|
||||||
|
ok = ekka_mnesia:create_table(?SESSION_TRIE, [
|
||||||
{ram_copies, [node()]},
|
{ram_copies, [node()]},
|
||||||
{record_name, ?TRIE},
|
{record_name, ?TRIE},
|
||||||
{attributes, record_info(fields, ?TRIE)},
|
{attributes, record_info(fields, ?TRIE)},
|
||||||
|
@ -71,6 +84,7 @@ mnesia(boot) ->
|
||||||
{storage_properties, StoreProps}]);
|
{storage_properties, StoreProps}]);
|
||||||
mnesia(copy) ->
|
mnesia(copy) ->
|
||||||
%% Copy topics table
|
%% Copy topics table
|
||||||
|
ok = ekka_mnesia:copy_table(?SESSION_TRIE, ram_copies),
|
||||||
ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
|
ok = ekka_mnesia:copy_table(?TRIE, ram_copies).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -80,24 +94,46 @@ mnesia(copy) ->
|
||||||
%% @doc Insert a topic filter into the trie.
|
%% @doc Insert a topic filter into the trie.
|
||||||
-spec(insert(emqx_topic:topic()) -> ok).
|
-spec(insert(emqx_topic:topic()) -> ok).
|
||||||
insert(Topic) when is_binary(Topic) ->
|
insert(Topic) when is_binary(Topic) ->
|
||||||
|
insert(Topic, ?TRIE).
|
||||||
|
|
||||||
|
-spec(insert_session(emqx_topic:topic()) -> ok).
|
||||||
|
insert_session(Topic) when is_binary(Topic) ->
|
||||||
|
insert(Topic, ?SESSION_TRIE).
|
||||||
|
|
||||||
|
insert(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
case mnesia:wread({?TRIE, TopicKey}) of
|
case mnesia:wread({Trie, TopicKey}) of
|
||||||
[_] -> ok; %% already inserted
|
[_] -> ok; %% already inserted
|
||||||
[] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys])
|
[] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Delete a topic filter from the trie.
|
%% @doc Delete a topic filter from the trie.
|
||||||
-spec(delete(emqx_topic:topic()) -> ok).
|
-spec(delete(emqx_topic:topic()) -> ok).
|
||||||
delete(Topic) when is_binary(Topic) ->
|
delete(Topic) when is_binary(Topic) ->
|
||||||
|
delete(Topic, ?TRIE).
|
||||||
|
|
||||||
|
%% @doc Delete a topic filter from the trie.
|
||||||
|
-spec(delete_session(emqx_topic:topic()) -> ok).
|
||||||
|
delete_session(Topic) when is_binary(Topic) ->
|
||||||
|
delete(Topic, ?SESSION_TRIE).
|
||||||
|
|
||||||
|
delete(Topic, Trie) when is_binary(Topic) ->
|
||||||
{TopicKey, PrefixKeys} = make_keys(Topic),
|
{TopicKey, PrefixKeys} = make_keys(Topic),
|
||||||
case [] =/= mnesia:wread({?TRIE, TopicKey}) of
|
case [] =/= mnesia:wread({Trie, TopicKey}) of
|
||||||
true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]);
|
true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]);
|
||||||
false -> ok
|
false -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Find trie nodes that matchs the topic name.
|
%% @doc Find trie nodes that matchs the topic name.
|
||||||
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
-spec(match(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
||||||
match(Topic) when is_binary(Topic) ->
|
match(Topic) when is_binary(Topic) ->
|
||||||
|
match(Topic, ?TRIE).
|
||||||
|
|
||||||
|
-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())).
|
||||||
|
match_session(Topic) when is_binary(Topic) ->
|
||||||
|
match(Topic, ?SESSION_TRIE).
|
||||||
|
|
||||||
|
match(Topic, Trie) when is_binary(Topic) ->
|
||||||
Words = emqx_topic:words(Topic),
|
Words = emqx_topic:words(Topic),
|
||||||
case emqx_topic:wildcard(Words) of
|
case emqx_topic:wildcard(Words) of
|
||||||
true ->
|
true ->
|
||||||
|
@ -110,17 +146,26 @@ match(Topic) when is_binary(Topic) ->
|
||||||
%% Such clients will get disconnected.
|
%% Such clients will get disconnected.
|
||||||
[];
|
[];
|
||||||
false ->
|
false ->
|
||||||
do_match(Words)
|
do_match(Words, Trie)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @doc Is the trie empty?
|
%% @doc Is the trie empty?
|
||||||
-spec(empty() -> boolean()).
|
-spec(empty() -> boolean()).
|
||||||
empty() -> ets:first(?TRIE) =:= '$end_of_table'.
|
empty() -> empty(?TRIE).
|
||||||
|
|
||||||
|
empty_session() ->
|
||||||
|
empty(?SESSION_TRIE).
|
||||||
|
|
||||||
|
empty(Trie) -> ets:first(Trie) =:= '$end_of_table'.
|
||||||
|
|
||||||
-spec lock_tables() -> ok.
|
-spec lock_tables() -> ok.
|
||||||
lock_tables() ->
|
lock_tables() ->
|
||||||
mnesia:write_lock_table(?TRIE).
|
mnesia:write_lock_table(?TRIE).
|
||||||
|
|
||||||
|
-spec lock_session_tables() -> ok.
|
||||||
|
lock_session_tables() ->
|
||||||
|
mnesia:write_lock_table(?SESSION_TRIE).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
@ -168,70 +213,70 @@ make_prefixes([H | T], Prefix0, Acc0) ->
|
||||||
Acc = [Prefix | Acc0],
|
Acc = [Prefix | Acc0],
|
||||||
make_prefixes(T, Prefix, Acc).
|
make_prefixes(T, Prefix, Acc).
|
||||||
|
|
||||||
insert_key(Key) ->
|
insert_key(Key, Trie) ->
|
||||||
T = case mnesia:wread({?TRIE, Key}) of
|
T = case mnesia:wread({Trie, Key}) of
|
||||||
[#?TRIE{count = C} = T1] ->
|
[#?TRIE{count = C} = T1] ->
|
||||||
T1#?TRIE{count = C + 1};
|
T1#?TRIE{count = C + 1};
|
||||||
[] ->
|
[] ->
|
||||||
#?TRIE{key = Key, count = 1}
|
#?TRIE{key = Key, count = 1}
|
||||||
end,
|
end,
|
||||||
ok = mnesia:write(T).
|
ok = mnesia:write(Trie, T, write).
|
||||||
|
|
||||||
delete_key(Key) ->
|
delete_key(Key, Trie) ->
|
||||||
case mnesia:wread({?TRIE, Key}) of
|
case mnesia:wread({Trie, Key}) of
|
||||||
[#?TRIE{count = C} = T] when C > 1 ->
|
[#?TRIE{count = C} = T] when C > 1 ->
|
||||||
ok = mnesia:write(T#?TRIE{count = C - 1});
|
ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write);
|
||||||
[_] ->
|
[_] ->
|
||||||
ok = mnesia:delete(?TRIE, Key, write);
|
ok = mnesia:delete(Trie, Key, write);
|
||||||
[] ->
|
[] ->
|
||||||
ok
|
ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% micro-optimization: no need to lookup when topic is not wildcard
|
%% micro-optimization: no need to lookup when topic is not wildcard
|
||||||
%% because we only insert wildcards to emqx_trie
|
%% because we only insert wildcards to emqx_trie
|
||||||
lookup_topic(_Topic, false) -> [];
|
lookup_topic(_Topic,_Trie, false) -> [];
|
||||||
lookup_topic(Topic, true) -> lookup_topic(Topic).
|
lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie).
|
||||||
|
|
||||||
lookup_topic(Topic) when is_binary(Topic) ->
|
lookup_topic(Topic, Trie) when is_binary(Topic) ->
|
||||||
case ets:lookup(?TRIE, ?TOPIC(Topic)) of
|
case ets:lookup(Trie, ?TOPIC(Topic)) of
|
||||||
[#?TRIE{count = C}] -> [Topic || C > 0];
|
[#?TRIE{count = C}] -> [Topic || C > 0];
|
||||||
[] -> []
|
[] -> []
|
||||||
end.
|
end.
|
||||||
|
|
||||||
has_prefix(empty) -> true; %% this is the virtual tree root
|
has_prefix(empty, _Trie) -> true; %% this is the virtual tree root
|
||||||
has_prefix(Prefix) ->
|
has_prefix(Prefix, Trie) ->
|
||||||
case ets:lookup(?TRIE, ?PREFIX(Prefix)) of
|
case ets:lookup(Trie, ?PREFIX(Prefix)) of
|
||||||
[#?TRIE{count = C}] -> C > 0;
|
[#?TRIE{count = C}] -> C > 0;
|
||||||
[] -> false
|
[] -> false
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_match([<<"$", _/binary>> = Prefix | Words]) ->
|
do_match([<<"$", _/binary>> = Prefix | Words], Trie) ->
|
||||||
%% For topics having dollar sign prefix,
|
%% For topics having dollar sign prefix,
|
||||||
%% we do not match root level + or #,
|
%% we do not match root level + or #,
|
||||||
%% fast forward to the next level.
|
%% fast forward to the next level.
|
||||||
case Words =:= [] of
|
case Words =:= [] of
|
||||||
true -> lookup_topic(Prefix);
|
true -> lookup_topic(Prefix, Trie);
|
||||||
false -> []
|
false -> []
|
||||||
end ++ do_match(Words, Prefix);
|
end ++ do_match(Words, Prefix, Trie);
|
||||||
do_match(Words) ->
|
do_match(Words, Trie) ->
|
||||||
do_match(Words, empty).
|
do_match(Words, empty, Trie).
|
||||||
|
|
||||||
do_match(Words, Prefix) ->
|
do_match(Words, Prefix, Trie) ->
|
||||||
case is_compact() of
|
case is_compact() of
|
||||||
true -> match_compact(Words, Prefix, false, []);
|
true -> match_compact(Words, Prefix, Trie, false, []);
|
||||||
false -> match_no_compact(Words, Prefix, false, [])
|
false -> match_no_compact(Words, Prefix, Trie, false, [])
|
||||||
end.
|
end.
|
||||||
|
|
||||||
match_no_compact([], Topic, IsWildcard, Acc) ->
|
match_no_compact([], Topic, Trie, IsWildcard, Acc) ->
|
||||||
'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/#
|
'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/#
|
||||||
lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+
|
lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+
|
||||||
Acc;
|
Acc;
|
||||||
match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
|
||||||
case has_prefix(Prefix) of
|
case has_prefix(Prefix, Trie) of
|
||||||
true ->
|
true ->
|
||||||
Acc1 = 'match_#'(Prefix) ++ Acc0,
|
Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
|
||||||
Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1),
|
Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1),
|
||||||
match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc);
|
match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc);
|
||||||
false ->
|
false ->
|
||||||
%% non-compact paths in database
|
%% non-compact paths in database
|
||||||
%% if there is no prefix matches the current topic prefix
|
%% if there is no prefix matches the current topic prefix
|
||||||
|
@ -248,26 +293,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
||||||
Acc0
|
Acc0
|
||||||
end.
|
end.
|
||||||
|
|
||||||
match_compact([], Topic, IsWildcard, Acc) ->
|
match_compact([], Topic, Trie, IsWildcard, Acc) ->
|
||||||
'match_#'(Topic) ++ %% try match foo/bar/#
|
'match_#'(Topic, Trie) ++ %% try match foo/bar/#
|
||||||
lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar
|
lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar
|
||||||
Acc;
|
Acc;
|
||||||
match_compact([Word | Words], Prefix, IsWildcard, Acc0) ->
|
match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) ->
|
||||||
Acc1 = 'match_#'(Prefix) ++ Acc0,
|
Acc1 = 'match_#'(Prefix, Trie) ++ Acc0,
|
||||||
Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1),
|
Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1),
|
||||||
WildcardPrefix = join(Prefix, '+'),
|
WildcardPrefix = join(Prefix, '+'),
|
||||||
%% go deeper to match current_prefix/+ only when:
|
%% go deeper to match current_prefix/+ only when:
|
||||||
%% 1. current word is the last
|
%% 1. current word is the last
|
||||||
%% OR
|
%% OR
|
||||||
%% 2. there is a prefix = 'current_prefix/+'
|
%% 2. there is a prefix = 'current_prefix/+'
|
||||||
case Words =:= [] orelse has_prefix(WildcardPrefix) of
|
case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of
|
||||||
true -> match_compact(Words, WildcardPrefix, true, Acc);
|
true -> match_compact(Words, WildcardPrefix, Trie, true, Acc);
|
||||||
false -> Acc
|
false -> Acc
|
||||||
end.
|
end.
|
||||||
|
|
||||||
'match_#'(Prefix) ->
|
'match_#'(Prefix, Trie) ->
|
||||||
MlTopic = join(Prefix, '#'),
|
MlTopic = join(Prefix, '#'),
|
||||||
lookup_topic(MlTopic).
|
lookup_topic(MlTopic, Trie).
|
||||||
|
|
||||||
is_compact() ->
|
is_compact() ->
|
||||||
emqx_config:get([broker, perf, trie_compaction], true).
|
emqx_config:get([broker, perf, trie_compaction], true).
|
||||||
|
|
|
@ -191,6 +191,7 @@ init_per_suite(Config) ->
|
||||||
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
|
ok = meck:expect(emqx_hooks, run_fold, fun(_Hook, _Args, Acc) -> Acc end),
|
||||||
%% Session Meck
|
%% Session Meck
|
||||||
ok = meck:new(emqx_session, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_session, [passthrough, no_history, no_link]),
|
||||||
|
meck:expect(emqx_session, db_put, fun(_, _, _) -> ok end),
|
||||||
%% Metrics
|
%% Metrics
|
||||||
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
|
ok = meck:new(emqx_metrics, [passthrough, no_history, no_link]),
|
||||||
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
ok = meck:expect(emqx_metrics, inc, fun(_) -> ok end),
|
||||||
|
|
|
@ -333,37 +333,6 @@ t_connect_keepalive_timeout(Config) ->
|
||||||
error("keepalive timeout")
|
error("keepalive timeout")
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% [MQTT-3.1.2-23]
|
|
||||||
t_connect_session_expiry_interval(Config) ->
|
|
||||||
ConnFun = ?config(conn_fun, Config),
|
|
||||||
Topic = nth(1, ?TOPICS),
|
|
||||||
Payload = "test message",
|
|
||||||
|
|
||||||
{ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
|
|
||||||
{proto_ver, v5},
|
|
||||||
{properties, #{'Session-Expiry-Interval' => 7200}}
|
|
||||||
| Config
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client1),
|
|
||||||
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
|
||||||
ok = emqtt:disconnect(Client1),
|
|
||||||
|
|
||||||
{ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client2),
|
|
||||||
{ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
|
|
||||||
ok = emqtt:disconnect(Client2),
|
|
||||||
|
|
||||||
{ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
|
|
||||||
{proto_ver, v5},
|
|
||||||
{clean_start, false} | Config
|
|
||||||
]),
|
|
||||||
{ok, _} = emqtt:ConnFun(Client3),
|
|
||||||
[Msg | _ ] = receive_messages(1),
|
|
||||||
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
|
|
||||||
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
|
|
||||||
?assertEqual({ok, 2}, maps:find(qos, Msg)),
|
|
||||||
ok = emqtt:disconnect(Client3).
|
|
||||||
|
|
||||||
%% [MQTT-3.1.3-9]
|
%% [MQTT-3.1.3-9]
|
||||||
%% !!!REFACTOR NEED:
|
%% !!!REFACTOR NEED:
|
||||||
%t_connect_will_delay_interval(Config) ->
|
%t_connect_will_delay_interval(Config) ->
|
||||||
|
|
|
@ -0,0 +1,307 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%%
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_persistent_session_SUITE).
|
||||||
|
|
||||||
|
-include_lib("stdlib/include/assert.hrl").
|
||||||
|
|
||||||
|
-compile(export_all).
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% SUITE boilerplate
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[ {group, tcp}
|
||||||
|
, {group, quic}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
TCs = emqx_ct:all(?MODULE),
|
||||||
|
[ {tcp, [], TCs}
|
||||||
|
, {quic, [], TCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_group(tcp, Config) ->
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
[ {port, 1883}, {conn_fun, connect} | Config];
|
||||||
|
init_per_group(quic, Config) ->
|
||||||
|
emqx_ct_helpers:start_apps([]),
|
||||||
|
[ {port, 14567}, {conn_fun, quic_connect} | Config];
|
||||||
|
init_per_group(_, Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
%% Start Apps
|
||||||
|
emqx_ct_helpers:boot_modules(all),
|
||||||
|
emqx_ct_helpers:start_apps([emqx], fun set_special_confs/1),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
set_special_confs(emqx) ->
|
||||||
|
application:set_env(emqx, plugins_loaded_file,
|
||||||
|
emqx_ct_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"));
|
||||||
|
set_special_confs(_) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
emqx_ct_helpers:stop_apps([]).
|
||||||
|
|
||||||
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||||
|
true -> ?MODULE:TestCase(init, Config);
|
||||||
|
_ -> Config
|
||||||
|
end.
|
||||||
|
|
||||||
|
end_per_testcase(TestCase, Config) ->
|
||||||
|
case erlang:function_exported(?MODULE, TestCase, 2) of
|
||||||
|
true -> ?MODULE:TestCase('end', Config);
|
||||||
|
false -> ok
|
||||||
|
end,
|
||||||
|
Config.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Helpers
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
client_info(Key, Client) ->
|
||||||
|
maps:get(Key, maps:from_list(emqtt:info(Client)), undefined).
|
||||||
|
|
||||||
|
receive_messages(Count) ->
|
||||||
|
receive_messages(Count, []).
|
||||||
|
|
||||||
|
receive_messages(0, Msgs) ->
|
||||||
|
Msgs;
|
||||||
|
receive_messages(Count, Msgs) ->
|
||||||
|
receive
|
||||||
|
{publish, Msg} ->
|
||||||
|
receive_messages(Count-1, [Msg|Msgs]);
|
||||||
|
_Other ->
|
||||||
|
receive_messages(Count, Msgs)
|
||||||
|
after 1000 ->
|
||||||
|
Msgs
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Test Cases
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
%% [MQTT-3.1.2-23]
|
||||||
|
t_connect_session_expiry_interval(_) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
Topic = <<"t_connect_session_expiry_interval/foo">>,
|
||||||
|
Payload = "test message",
|
||||||
|
|
||||||
|
{ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 7200}}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
{ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
{ok, 2} = emqtt:publish(Client2, Topic, Payload, 2),
|
||||||
|
ok = emqtt:disconnect(Client2),
|
||||||
|
|
||||||
|
{ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client3),
|
||||||
|
[Msg | _ ] = receive_messages(1),
|
||||||
|
?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)),
|
||||||
|
?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)),
|
||||||
|
?assertEqual({ok, 2}, maps:find(qos, Msg)),
|
||||||
|
ok = emqtt:disconnect(Client3).
|
||||||
|
|
||||||
|
t_without_client_id(_) ->
|
||||||
|
process_flag(trap_exit, true), %% Emqtt client dies
|
||||||
|
{ok, Client0} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{error, {client_identifier_not_valid, _}} = emqtt:connect(Client0),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_assigned_clientid_persistent_session(_) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 7200}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
|
||||||
|
AssignedClientId = client_info(clientid, Client1),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
?assertEqual(1, client_info(session_present, Client2)),
|
||||||
|
ok = emqtt:disconnect(Client2).
|
||||||
|
|
||||||
|
t_cancel_on_disconnect(_) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
ClientId = <<"t_cancel_on_disconnect">>,
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 16#FFFFFFFF}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([ {clientid, ClientId},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
|
ok = emqtt:disconnect(Client2).
|
||||||
|
|
||||||
|
t_process_dies(_) ->
|
||||||
|
%% Emulate an error in the connect process,
|
||||||
|
%% or that the node of the process goes down.
|
||||||
|
%% A persistent session should survive anyway.
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
ClientId = <<"t_process_dies">>,
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 16#FFFFFFFF}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
[ChannelPid] = emqx_cm:lookup_channels(ClientId),
|
||||||
|
?assert(is_pid(ChannelPid)),
|
||||||
|
exit(ChannelPid, kill),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
?assertEqual(1, client_info(session_present, Client2)),
|
||||||
|
emqtt:disconnect(Client2).
|
||||||
|
|
||||||
|
t_process_dies_session_expires(_) ->
|
||||||
|
%% Emulate an error in the connect process,
|
||||||
|
%% or that the node of the process goes down.
|
||||||
|
%% A persistent session should eventually expire.
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
ClientId = <<"t_process_dies_session_expires">>,
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 1}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
[ChannelPid] = emqx_cm:lookup_channels(ClientId),
|
||||||
|
?assert(is_pid(ChannelPid)),
|
||||||
|
exit(ChannelPid, kill),
|
||||||
|
|
||||||
|
timer:sleep(1000),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
|
emqtt:disconnect(Client2).
|
||||||
|
|
||||||
|
t_clean_start_drops_subscriptions(_) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
Topic = <<"t_clean_start_drops_subscriptions/bar">>,
|
||||||
|
STopic = <<"t_clean_start_drops_subscriptions/+">>,
|
||||||
|
Payload = <<"hello">>,
|
||||||
|
ClientId = <<"t_clean_start_drops_subscriptions">>,
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client1, STopic, 0),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
?assertEqual(0, client_info(session_present, Client2)),
|
||||||
|
{ok, _, [0]} = emqtt:subscribe(Client2, STopic, 0),
|
||||||
|
|
||||||
|
{ok, Client3} = emqtt:start_link([{proto_ver, v5}]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client3),
|
||||||
|
{ok, 2} = emqtt:publish(Client3, Topic, Payload, 2),
|
||||||
|
ok = emqtt:disconnect(Client3),
|
||||||
|
|
||||||
|
[_Msg1] = receive_messages(1),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(Client2).
|
||||||
|
|
||||||
|
t_subscription_while_process_is_gone(_) ->
|
||||||
|
ConnFun = ?config(conn_fun, Config),
|
||||||
|
Topic = <<"t_clean_start_drops_subscriptions/bar">>,
|
||||||
|
STopic = <<"t_clean_start_drops_subscriptions/+">>,
|
||||||
|
Payload1 = <<"hello1">>,
|
||||||
|
Payload2 = <<"hello2">>,
|
||||||
|
ClientId = <<"t_clean_start_drops_subscriptions">>,
|
||||||
|
{ok, Client1} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
||||||
|
{clean_start, true}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client1),
|
||||||
|
{ok, _, [2]} = emqtt:subscribe(Client1, STopic, 2),
|
||||||
|
ok = emqtt:disconnect(Client1),
|
||||||
|
|
||||||
|
[ChannelPid] = emqx_cm:lookup_channels(ClientId),
|
||||||
|
?assert(is_pid(ChannelPid)),
|
||||||
|
exit(ChannelPid, kill),
|
||||||
|
|
||||||
|
{ok, Client2} = emqtt:start_link([]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client2),
|
||||||
|
{ok, 2} = emqtt:publish(Client2, Topic, Payload1, 2),
|
||||||
|
{ok, 3} = emqtt:publish(Client2, Topic, Payload2, 2),
|
||||||
|
|
||||||
|
{ok, Client3} = emqtt:start_link([ {proto_ver, v5},
|
||||||
|
{clientid, ClientId},
|
||||||
|
{properties, #{'Session-Expiry-Interval' => 30}},
|
||||||
|
{clean_start, false}
|
||||||
|
]),
|
||||||
|
{ok, _} = emqtt:ConnFun(Client3),
|
||||||
|
[Msg1] = receive_messages(1),
|
||||||
|
[Msg2] = receive_messages(1),
|
||||||
|
?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)),
|
||||||
|
?assertEqual({ok, 2}, maps:find(qos, Msg1)),
|
||||||
|
?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)),
|
||||||
|
?assertEqual({ok, 2}, maps:find(qos, Msg2)),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(Client2),
|
||||||
|
ok = emqtt:disconnect(Client3).
|
||||||
|
|
|
@ -186,7 +186,7 @@ t_delete3(_) ->
|
||||||
?TRIE:delete(<<"sensor/+/unknown">>)
|
?TRIE:delete(<<"sensor/+/unknown">>)
|
||||||
end),
|
end),
|
||||||
?assertEqual([], ?TRIE:match(<<"sensor">>)),
|
?assertEqual([], ?TRIE:match(<<"sensor">>)),
|
||||||
?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)).
|
?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)).
|
||||||
|
|
||||||
clear_tables() -> emqx_trie:clear_tables().
|
clear_tables() -> emqx_trie:clear_tables().
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue