diff --git a/Makefile b/Makefile index 002273850..b699bbe82 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ REL_PROFILES := emqx emqx-edge PKG_PROFILES := emqx-pkg emqx-edge-pkg PROFILES := $(REL_PROFILES) $(PKG_PROFILES) default +CT_NODE_NAME ?= 'test@127.0.0.1' + export REBAR_GIT_CLONE_OPTIONS += --depth=1 .PHONY: default @@ -44,7 +46,7 @@ proper: $(REBAR) .PHONY: ct ct: $(REBAR) conf-segs - @ENABLE_COVER_COMPILE=1 $(REBAR) ct --name 'test@127.0.0.1' -c -v + @ENABLE_COVER_COMPILE=1 $(REBAR) ct --name $(CT_NODE_NAME) -c -v APPS=$(shell $(CURDIR)/scripts/find-apps.sh) @@ -52,7 +54,7 @@ APPS=$(shell $(CURDIR)/scripts/find-apps.sh) .PHONY: $(APPS:%=%-ct) define gen-app-ct-target $1-ct: - $(REBAR) ct --name 'test@127.0.0.1' -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1) + $(REBAR) ct --name $(CT_NODE_NAME) -v --suite $(shell $(CURDIR)/scripts/find-suites.sh $1) endef $(foreach app,$(APPS),$(eval $(call gen-app-ct-target,$(app)))) @@ -64,6 +66,16 @@ $1-prop: endef $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app)))) +.PHONY: ct-suite +ct-suite: $(REBAR) +ifneq ($(TESTCASE),) + $(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) +else ifneq ($(GROUP),) + $(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE) --group $(GROUP) +else + $(REBAR) ct -v --readable=false --name $(CT_NODE_NAME) --suite $(SUITE) +endif + .PHONY: cover cover: $(REBAR) @ENABLE_COVER_COMPILE=1 $(REBAR) cover diff --git a/apps/emqx/etc/emqx.conf b/apps/emqx/etc/emqx.conf index 2b70d6dda..786c3d1d6 100644 --- a/apps/emqx/etc/emqx.conf +++ b/apps/emqx/etc/emqx.conf @@ -1638,3 +1638,33 @@ example_common_websocket_options { client_max_window_bits = 15 } } + +persistent_session_store { + ## Enable/disable internal persistent session store. + ## + ## @doc persistent_session_store.enabled + ## ValueType: Boolean + ## Default: false + enabled = false + + ## How long are undelivered messages retained in the store + ## + ## @doc persistent_session_store.max_retain_undelivered + ## ValueType: Duration + ## Default: 1h + max_retain_undelivered = 1h + + ## The time interval in which to try to run garbage collection of persistent session messages + ## + ## @doc persistent_session_store.message_gc_interval + ## ValueType: Duration + ## Default: 1h + message_gc_interval = 1h + + ## The time interval in which to try to run garbage collection of persistent session transient data + ## + ## @doc persistent_session_store.session_message_gc_interval + ## ValueType: Duration + ## Default: 1m + session_message_gc_interval = 1m +} diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 486143912..cf419edc5 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -23,10 +23,12 @@ -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(CM_SHARD, emqx_cm_shard). -define(ROUTE_SHARD, route_shard). +-define(PERSISTENT_SESSION_SHARD, emqx_persistent_session_shard). -define(BOOT_SHARDS, [ ?ROUTE_SHARD , ?COMMON_SHARD , ?SHARED_SUB_SHARD + , ?PERSISTENT_SESSION_SHARD ]). %% Banner @@ -87,7 +89,7 @@ -record(route, { topic :: binary(), - dest :: node() | {binary(), node()} + dest :: node() | {binary(), node()} | emqx_session:sessionID() }). %%-------------------------------------------------------------------- diff --git a/apps/emqx/src/emqx_app.erl b/apps/emqx/src/emqx_app.erl index 9091304fc..d1090803d 100644 --- a/apps/emqx/src/emqx_app.erl +++ b/apps/emqx/src/emqx_app.erl @@ -41,6 +41,7 @@ start(_Type, _Args) -> ok = maybe_load_config(), + ok = emqx_persistent_session:init_db_backend(), ok = maybe_start_quicer(), wait_boot_shards(), {ok, Sup} = emqx_sup:start_link(), diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index e556361c7..c74fa22e7 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -206,6 +206,7 @@ publish(Msg) when is_record(Msg, message) -> payload => emqx_message:to_log_map(Msg)}), []; Msg1 = #message{topic = Topic} -> + emqx_persistent_session:persist_message(Msg1), route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)) end. diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index 51e1ed162..7df7cd42d 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -33,8 +33,6 @@ , get_mqtt_conf/2 , get_mqtt_conf/3 , set_conn_state/2 - , get_session/1 - , set_session/2 , stats/1 , caps/1 ]). @@ -180,11 +178,10 @@ info(timers, #channel{timers = Timers}) -> Timers. set_conn_state(ConnState, Channel) -> Channel#channel{conn_state = ConnState}. -get_session(#channel{session = Session}) -> - Session. - -set_session(Session, Channel) -> - Channel#channel{session = Session}. +set_session(Session, Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> + %% Assume that this is also an updated session. Allow side effect. + Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Channel#channel{session = Session1}. %% TODO: Add more stats. -spec(stats(channel()) -> emqx_types:stats()). @@ -369,10 +366,10 @@ handle_in(?PUBACK_PACKET(PacketId, _ReasonCode, Properties), Channel case emqx_session:puback(PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - {ok, Channel#channel{session = NSession}}; + {ok, set_session(NSession, Channel)}; {ok, Msg, Publishes, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - handle_out(publish, Publishes, Channel#channel{session = NSession}); + handle_out(publish, Publishes, set_session(NSession, Channel)); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "puback_packetId_inuse", packetId => PacketId}), ok = emqx_metrics:inc('packets.puback.inuse'), @@ -388,7 +385,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel case emqx_session:pubrec(PacketId, Session) of {ok, Msg, NSession} -> ok = after_message_acked(ClientInfo, Msg, Properties), - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(pubrel, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> ?SLOG(warning, #{msg => "pubrec_packetId_inuse", packetId => PacketId}), @@ -403,7 +400,7 @@ handle_in(?PUBREC_PACKET(PacketId, _ReasonCode, Properties), Channel handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> case emqx_session:pubrel(PacketId, Session) of {ok, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), @@ -414,9 +411,9 @@ handle_in(?PUBREL_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Se handle_in(?PUBCOMP_PACKET(PacketId, _ReasonCode), Channel = #channel{session = Session}) -> case emqx_session:pubcomp(PacketId, Session) of {ok, NSession} -> - {ok, Channel#channel{session = NSession}}; + {ok, set_session(NSession, Channel)}; {ok, Publishes, NSession} -> - handle_out(publish, Publishes, Channel#channel{session = NSession}); + handle_out(publish, Publishes, set_session(NSession, Channel)); {error, ?RC_PACKET_IDENTIFIER_IN_USE} -> ok = emqx_metrics:inc('packets.pubcomp.inuse'), {ok, Channel}; @@ -624,7 +621,8 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2}, case emqx_session:publish(PacketId, Msg, Session) of {ok, PubRes, NSession} -> RC = puback_reason_code(PubRes), - NChannel1 = ensure_timer(await_timer, Channel#channel{session = NSession}), + NChannel0 = set_session(NSession, Channel), + NChannel1 = ensure_timer(await_timer, NChannel0), NChannel2 = ensure_quota(PubRes, NChannel1), handle_out(pubrec, {PacketId, RC}, NChannel2); {error, RC = ?RC_PACKET_IDENTIFIER_IN_USE} -> @@ -698,7 +696,7 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel = NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), Channel), case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of {ok, NSession} -> - {QoS, Channel#channel{session = NSession}}; + {QoS, set_session(NSession, Channel)}; {error, RC} -> ?SLOG(warning, #{ msg => "cannot_subscribe_topic_filter", @@ -728,7 +726,7 @@ do_unsubscribe(TopicFilter, SubOpts, Channel = TopicFilter1 = emqx_mountpoint:mount(MountPoint, TopicFilter), case emqx_session:unsubscribe(ClientInfo, TopicFilter1, SubOpts, Session) of {ok, NSession} -> - {?RC_SUCCESS, Channel#channel{session = NSession}}; + {?RC_SUCCESS, set_session(NSession, Channel)}; {error, RC} -> {RC, Channel} end. %%-------------------------------------------------------------------- @@ -752,8 +750,23 @@ process_disconnect(ReasonCode, Properties, Channel) -> {ok, {close, disconnect_reason(ReasonCode)}, NChannel}. maybe_update_expiry_interval(#{'Session-Expiry-Interval' := Interval}, - Channel = #channel{conninfo = ConnInfo}) -> - Channel#channel{conninfo = ConnInfo#{expiry_interval => timer:seconds(Interval)}}; + Channel = #channel{conninfo = ConnInfo, clientinfo = ClientInfo}) -> + EI = timer:seconds(Interval), + OldEI = maps:get(expiry_interval, ConnInfo, 0), + case OldEI =:= EI of + true -> Channel; + false -> + NChannel = Channel#channel{conninfo = ConnInfo#{expiry_interval => EI}}, + ClientID = maps:get(clientid, ClientInfo, undefined), + %% Check if the client turns off persistence (turning it on is disallowed) + case EI =:= 0 andalso OldEI > 0 of + true -> + S = emqx_persistent_session:discard(ClientID, NChannel#channel.session), + set_session(S, NChannel); + false -> + NChannel + end + end; maybe_update_expiry_interval(_Properties, Channel) -> Channel. %%-------------------------------------------------------------------- @@ -765,39 +778,33 @@ maybe_update_expiry_interval(_Properties, Channel) -> Channel. handle_deliver(Delivers, Channel = #channel{conn_state = disconnected, session = Session, clientinfo = #{clientid := ClientId}}) -> - NSession = emqx_session:enqueue(ignore_local(maybe_nack(Delivers), ClientId, Session), Session), - {ok, Channel#channel{session = NSession}}; + Delivers1 = maybe_nack(Delivers), + Delivers2 = emqx_session:ignore_local(Delivers1, ClientId, Session), + NSession = emqx_session:enqueue(Delivers2, Session), + NChannel = set_session(NSession, Channel), + %% We consider queued/dropped messages as delivered since they are now in the session state. + maybe_mark_as_delivered(Session, Delivers), + {ok, NChannel}; handle_deliver(Delivers, Channel = #channel{takeover = true, pendings = Pendings, session = Session, clientinfo = #{clientid := ClientId}}) -> - NPendings = lists:append(Pendings, ignore_local(maybe_nack(Delivers), ClientId, Session)), + NPendings = lists:append(Pendings, emqx_session:ignore_local(maybe_nack(Delivers), ClientId, Session)), {ok, Channel#channel{pendings = NPendings}}; handle_deliver(Delivers, Channel = #channel{session = Session, - clientinfo = #{clientid := ClientId}}) -> - case emqx_session:deliver(ignore_local(Delivers, ClientId, Session), Session) of + clientinfo = #{clientid := ClientId} + }) -> + case emqx_session:deliver(emqx_session:ignore_local(Delivers, ClientId, Session), Session) of {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), + maybe_mark_as_delivered(NSession, Delivers), handle_out(publish, Publishes, ensure_timer(retry_timer, NChannel)); {ok, NSession} -> - {ok, Channel#channel{session = NSession}} + {ok, set_session(NSession, Channel)} end. -ignore_local(Delivers, Subscriber, Session) -> - Subs = emqx_session:info(subscriptions, Session), - lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> - case maps:find(Topic, Subs) of - {ok, #{nl := 1}} when Subscriber =:= Publisher -> - ok = emqx_metrics:inc('delivery.dropped'), - ok = emqx_metrics:inc('delivery.dropped.no_local'), - true; - _ -> - false - end - end, Delivers). - %% Nack delivers from shared subscription maybe_nack(Delivers) -> lists:filter(fun not_nacked/1, Delivers). @@ -806,6 +813,14 @@ not_nacked({deliver, _Topic, Msg}) -> not (emqx_shared_sub:is_ack_required(Msg) andalso (ok == emqx_shared_sub:nack_no_connection(Msg))). +maybe_mark_as_delivered(Session, Delivers) -> + case emqx_session:info(is_persistent, Session) of + false -> skip; + true -> + SessionID = emqx_session:info(id, Session), + emqx_persistent_session:mark_as_delivered(SessionID, Delivers) + end. + %%-------------------------------------------------------------------- %% Handle outgoing packet %%-------------------------------------------------------------------- @@ -898,13 +913,13 @@ return_connack(AckPacket, Channel) -> case maybe_resume_session(Channel) of ignore -> {ok, Replies, Channel}; {ok, Publishes, NSession} -> - NChannel = Channel#channel{session = NSession, - resuming = false, + NChannel0 = Channel#channel{resuming = false, pendings = [] }, - {Packets, NChannel1} = do_deliver(Publishes, NChannel), + NChannel1 = set_session(NSession, NChannel0), + {Packets, NChannel2} = do_deliver(Publishes, NChannel1), Outgoing = [{outgoing, Packets} || length(Packets) > 0], - {ok, Replies ++ Outgoing, NChannel1} + {ok, Replies ++ Outgoing, NChannel2} end. %%-------------------------------------------------------------------- @@ -1028,10 +1043,28 @@ handle_info(clean_authz_cache, Channel) -> ok = emqx_authz_cache:empty_authz_cache(), {ok, Channel}; +handle_info(die_if_test = Info, Channel) -> + die_if_test_compiled(), + ?LOG(error, "Unexpected info: ~p", [Info]), + {ok, Channel}; + handle_info(Info, Channel) -> ?SLOG(error, #{msg => "unexpected_info", info => Info}), {ok, Channel}. +-ifdef(TEST). + +-spec die_if_test_compiled() -> no_return(). +die_if_test_compiled() -> + exit(normal). + +-else. + +die_if_test_compiled() -> + ok. + +-endif. + %%-------------------------------------------------------------------- %% Handle timeout %%-------------------------------------------------------------------- @@ -1063,9 +1096,9 @@ handle_timeout(_TRef, retry_delivery, Channel = #channel{session = Session}) -> case emqx_session:retry(Session) of {ok, NSession} -> - {ok, clean_timer(retry_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(retry_timer, set_session(NSession, Channel))}; {ok, Publishes, Timeout, NSession} -> - NChannel = Channel#channel{session = NSession}, + NChannel = set_session(NSession, Channel), handle_out(publish, Publishes, reset_timer(retry_timer, Timeout, NChannel)) end; @@ -1076,9 +1109,9 @@ handle_timeout(_TRef, expire_awaiting_rel, Channel = #channel{session = Session}) -> case emqx_session:expire(awaiting_rel, Session) of {ok, NSession} -> - {ok, clean_timer(await_timer, Channel#channel{session = NSession})}; + {ok, clean_timer(await_timer, set_session(NSession, Channel))}; {ok, Timeout, NSession} -> - {ok, reset_timer(await_timer, Timeout, Channel#channel{session = NSession})} + {ok, reset_timer(await_timer, Timeout, set_session(NSession, Channel))} end; handle_timeout(_TRef, expire_session, Channel) -> @@ -1145,11 +1178,19 @@ interval(will_timer, #channel{will_msg = WillMsg}) -> terminate(_, #channel{conn_state = idle}) -> ok; terminate(normal, Channel) -> run_terminate_hook(normal, Channel); -terminate({shutdown, Reason}, Channel) - when Reason =:= kicked; Reason =:= discarded; Reason =:= takeovered -> +terminate({shutdown, kicked}, Channel) -> + _ = emqx_persistent_session:persist(Channel#channel.clientinfo, + Channel#channel.conninfo, + Channel#channel.session), + run_terminate_hook(kicked, Channel); +terminate({shutdown, Reason}, Channel) when Reason =:= discarded; + Reason =:= takeovered -> run_terminate_hook(Reason, Channel); terminate(Reason, Channel = #channel{will_msg = WillMsg}) -> (WillMsg =/= undefined) andalso publish_will_msg(WillMsg), + _ = emqx_persistent_session:persist(Channel#channel.clientinfo, + Channel#channel.conninfo, + Channel#channel.session), run_terminate_hook(Reason, Channel). run_terminate_hook(_Reason, #channel{session = undefined}) -> ok; @@ -1613,8 +1654,11 @@ maybe_resume_session(#channel{resuming = false}) -> ignore; maybe_resume_session(#channel{session = Session, resuming = true, - pendings = Pendings}) -> + pendings = Pendings, + clientinfo = #{clientid := ClientId}}) -> {ok, Publishes, Session1} = emqx_session:replay(Session), + %% We consider queued/dropped messages as delivered since they are now in the session state. + emqx_persistent_session:mark_as_delivered(ClientId, Pendings), case emqx_session:deliver(Pendings, Session1) of {ok, Session2} -> {ok, Publishes, Session2}; diff --git a/apps/emqx/src/emqx_cm.erl b/apps/emqx/src/emqx_cm.erl index c76c8d396..83f77050a 100644 --- a/apps/emqx/src/emqx_cm.erl +++ b/apps/emqx/src/emqx_cm.erl @@ -19,7 +19,6 @@ -behaviour(gen_server). --include("emqx.hrl"). -include("logger.hrl"). -include("types.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). @@ -214,9 +213,11 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), CleanStart = fun(_) -> ok = discard_session(ClientId), + ok = emqx_persistent_session:discard_if_present(ClientId), Session = create_session(ClientInfo, ConnInfo), + Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, present => false}} + {ok, #{session => Session1, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); @@ -224,17 +225,34 @@ open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> Self = self(), ResumeStart = fun(_) -> case takeover_session(ClientId) of - {ok, ConnMod, ChanPid, Session} -> - ok = emqx_session:resume(ClientInfo, Session), - Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), + {persistent, Session} -> + %% This is a persistent session without a managing process. + {Session1, Pendings} = + emqx_persistent_session:resume(ClientInfo, ConnInfo, Session), register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, + + {ok, #{session => Session1, present => true, pendings => Pendings}}; - {error, not_found} -> - Session = create_session(ClientInfo, ConnInfo), + {living, ConnMod, ChanPid, Session} -> + ok = emqx_session:resume(ClientInfo, Session), + Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER), register_channel(ClientId, Self, ConnInfo), - {ok, #{session => Session, present => false}} + {ok, #{session => Session1, + present => true, + pendings => Pendings}}; + {expired, OldSession} -> + _ = emqx_persistent_session:discard(ClientId, OldSession), + Session = create_session(ClientInfo, ConnInfo), + Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session1, present => false}}; + none -> + Session = create_session(ClientInfo, ConnInfo), + Session1 = emqx_persistent_session:persist(ClientInfo, ConnInfo, Session), + register_channel(ClientId, Self, ConnInfo), + {ok, #{session => Session1, present => false}} end end, emqx_cm_locker:trans(ClientId, ResumeStart). @@ -246,13 +264,17 @@ create_session(ClientInfo, ConnInfo) -> ok = emqx_hooks:run('session.created', [ClientInfo, emqx_session:info(Session)]), Session. -get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> +get_session_confs(#{zone := Zone}, #{receive_maximum := MaxInflight, expiry_interval := EI}) -> #{max_subscriptions => get_mqtt_conf(Zone, max_subscriptions), upgrade_qos => get_mqtt_conf(Zone, upgrade_qos), max_inflight => MaxInflight, retry_interval => get_mqtt_conf(Zone, retry_interval), await_rel_timeout => get_mqtt_conf(Zone, await_rel_timeout), - mqueue => mqueue_confs(Zone) + mqueue => mqueue_confs(Zone), + %% TODO: Add conf for allowing/disallowing persistent sessions. + %% Note that the connection info is already enriched to have + %% default config values for session expiry. + is_persistent => EI > 0 }. mqueue_confs(Zone) -> @@ -266,11 +288,15 @@ get_mqtt_conf(Zone, Key) -> emqx_config:get_zone_conf(Zone, [mqtt, Key]). %% @doc Try to takeover a session. --spec(takeover_session(emqx_types:clientid()) -> - {error, term()} | {ok, atom(), pid(), emqx_session:session()}). +-spec takeover_session(emqx_types:clientid()) -> + none + | {living, atom(), pid(), emqx_session:session()} + | {persistent, emqx_session:session()} + | {expired, emqx_session:session()}. takeover_session(ClientId) -> case lookup_channels(ClientId) of - [] -> {error, not_found}; + [] -> + emqx_persistent_session:lookup(ClientId); [ChanPid] -> takeover_session(ClientId, ChanPid); ChanPids -> @@ -285,10 +311,10 @@ takeover_session(ClientId) -> takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> case get_chann_conn_mod(ClientId, ChanPid) of undefined -> - {error, not_found}; + emqx_persistent_session:lookup(ClientId); ConnMod when is_atom(ConnMod) -> Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER), - {ok, ConnMod, ChanPid, Session} + {living, ConnMod, ChanPid, Session} end; takeover_session(ClientId, ChanPid) -> diff --git a/apps/emqx/src/emqx_guid.erl b/apps/emqx/src/emqx_guid.erl index 3b66f6e92..b2ab2bf44 100644 --- a/apps/emqx/src/emqx_guid.erl +++ b/apps/emqx/src/emqx_guid.erl @@ -39,6 +39,9 @@ , from_base62/1 ]). +-export_type([ guid/0 + ]). + -define(TAG_VERSION, 131). -define(PID_EXT, 103). -define(NEW_PID_EXT, 88). diff --git a/apps/emqx/src/emqx_persistent_session.erl b/apps/emqx/src/emqx_persistent_session.erl new file mode 100644 index 000000000..71dac02c3 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session.erl @@ -0,0 +1,503 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session). + +-export([ is_store_enabled/0 + , init_db_backend/0 + ]). + +-export([ discard/2 + , discard_if_present/1 + , lookup/1 + , persist/3 + , persist_message/1 + , pending/1 + , pending/2 + , resume/3 + ]). + +-export([ add_subscription/3 + , remove_subscription/3 + ]). + +-export([ mark_as_delivered/2 + , mark_resume_begin/1 + ]). + +-export([ pending_messages_in_db/2 + , delete_session_message/1 + , gc_session_messages/1 + , session_message_info/2 + ]). + +-export([ delete_message/1 + , first_message_id/0 + , next_message_id/1 + ]). + +-export_type([ sess_msg_key/0 + ]). + +-include("emqx.hrl"). +-include("emqx_persistent_session.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-compile({inline, [is_store_enabled/0]}). + +-define(MAX_EXPIRY_INTERVAL, 4294967295000). %% 16#FFFFFFFF * 1000 + +%% NOTE: Order is significant because of traversal order of the table. +-define(MARKER, 3). +-define(DELIVERED, 2). +-define(UNDELIVERED, 1). +-define(ABANDONED, 0). + + +-type bin_timestamp() :: <<_:64>>. +-opaque sess_msg_key() :: + {emqx_guid:guid(), emqx_guid:guid(), emqx_types:topic(), ?UNDELIVERED | ?DELIVERED} + | {emqx_guid:guid(), emqx_guid:guid(), <<>> , ?MARKER} + | {emqx_guid:guid(), <<>> , bin_timestamp() , ?ABANDONED}. + +-type gc_traverse_fun() :: fun(('delete' | 'marker' | 'abandoned', sess_msg_key()) -> 'ok'). + +%%-------------------------------------------------------------------- +%% Init +%%-------------------------------------------------------------------- + +init_db_backend() -> + case is_store_enabled() of + true -> + ok = emqx_trie:create_session_trie(), + emqx_persistent_session_mnesia_backend:create_tables(), + persistent_term:put(?db_backend_key, emqx_persistent_session_mnesia_backend), + ok; + false -> + persistent_term:put(?db_backend_key, emqx_persistent_session_dummy_backend), + ok + end. + +is_store_enabled() -> + emqx_config:get(?is_enabled_key). + +%%-------------------------------------------------------------------- +%% Session message ADT API +%%-------------------------------------------------------------------- + +-spec session_message_info('timestamp' | 'sessionID', sess_msg_key()) -> term(). +session_message_info(timestamp, {_, <<>>, <>, ?ABANDONED}) -> TS; +session_message_info(timestamp, {_, GUID, _ , _ }) -> emqx_guid:timestamp(GUID); +session_message_info(sessionID, {SessionID, _, _, _}) -> SessionID. + +%%-------------------------------------------------------------------- +%% DB API +%%-------------------------------------------------------------------- + +first_message_id() -> + ?db_backend:first_message_id(). + +next_message_id(Key) -> + ?db_backend:next_message_id(Key). + +delete_message(Key) -> + ?db_backend:delete_message(Key). + +first_session_message() -> + ?db_backend:first_session_message(). + +next_session_message(Key) -> + ?db_backend:next_session_message(Key). + +delete_session_message(Key) -> + ?db_backend:delete_session_message(Key). + +put_session_store(#session_store{} = SS) -> + ?db_backend:put_session_store(SS). + +delete_session_store(ClientID) -> + ?db_backend:delete_session_store(ClientID). + +lookup_session_store(ClientID) -> + ?db_backend:lookup_session_store(ClientID). + +put_session_message({_, _, _, _} = Key) -> + ?db_backend:put_session_message(#session_msg{ key = Key }). + +put_message(Msg) -> + ?db_backend:put_message(Msg). + +get_message(MsgId) -> + ?db_backend:get_message(MsgId). + +pending_messages_in_db(SessionID, MarkerIds) -> + ?db_backend:ro_transaction(pending_messages_fun(SessionID, MarkerIds)). + +%%-------------------------------------------------------------------- +%% Session API +%%-------------------------------------------------------------------- + +%% The timestamp (TS) is the last time a client interacted with the session, +%% or when the client disconnected. +-spec persist(emqx_types:clientinfo(), + emqx_types:conninfo(), + emqx_session:session()) -> emqx_session:session(). + +persist(#{ clientid := ClientID }, ConnInfo, Session) -> + case ClientID == undefined orelse not emqx_session:info(is_persistent, Session) of + true -> Session; + false -> + SS = #session_store{ client_id = ClientID + , expiry_interval = maps:get(expiry_interval, ConnInfo) + , ts = timestamp_from_conninfo(ConnInfo) + , session = Session}, + case persistent_session_status(SS) of + not_persistent -> Session; + expired -> discard(ClientID, Session); + persistent -> put_session_store(SS), + Session + end + end. + +timestamp_from_conninfo(ConnInfo) -> + case maps:get(disconnected_at, ConnInfo, undefined) of + undefined -> erlang:system_time(millisecond); + Disconnect -> Disconnect + end. + +lookup(ClientID) when is_binary(ClientID) -> + case lookup_session_store(ClientID) of + none -> none; + {value, #session_store{session = S} = SS} -> + case persistent_session_status(SS) of + expired -> {expired, S}; + persistent -> {persistent, S} + end + end. + +-spec discard_if_present(binary()) -> 'ok'. +discard_if_present(ClientID) -> + case lookup(ClientID) of + none -> ok; + {Tag, Session} when Tag =:= persistent; Tag =:= expired -> + _ = discard(ClientID, Session), + ok + end. + +-spec discard(binary(), emgx_session:session()) -> emgx_session:session(). +discard(ClientID, Session) -> + discard_opt(is_store_enabled(), ClientID, Session). + +discard_opt(false,_ClientID, Session) -> + emqx_session:set_field(is_persistent, false, Session); +discard_opt(true, ClientID, Session) -> + delete_session_store(ClientID), + SessionID = emqx_session:info(id, Session), + put_session_message({SessionID, <<>>, << (erlang:system_time(microsecond)) : 64>>, ?ABANDONED}), + Subscriptions = emqx_session:info(subscriptions, Session), + emqx_session_router:delete_routes(SessionID, Subscriptions), + emqx_session:set_field(is_persistent, false, Session). + +-spec mark_resume_begin(emqx_session:sessionID()) -> emqx_guid:guid(). +mark_resume_begin(SessionID) -> + MarkerID = emqx_guid:gen(), + put_session_message({SessionID, MarkerID, <<>>, ?MARKER}), + MarkerID. + +add_subscription(TopicFilter, SessionID, true = _IsPersistent) -> + case is_store_enabled() of + true -> emqx_session_router:do_add_route(TopicFilter, SessionID); + false -> ok + end; +add_subscription(_TopicFilter, _SessionID, false = _IsPersistent) -> + ok. + +remove_subscription(TopicFilter, SessionID, true = _IsPersistent) -> + case is_store_enabled() of + true -> emqx_session_router:do_delete_route(TopicFilter, SessionID); + false -> ok + end; +remove_subscription(_TopicFilter, _SessionID, false = _IsPersistent) -> + ok. + +%%-------------------------------------------------------------------- +%% Resuming from DB state +%%-------------------------------------------------------------------- + +%% Must be called inside a emqx_cm_locker transaction. +-spec resume(emqx_types:clientinfo(), emqx_types:conninfo(), emqx_session:session() + ) -> {emqx_session:session(), [emqx_types:deliver()]}. +resume(ClientInfo = #{clientid := ClientID}, ConnInfo, Session) -> + SessionID = emqx_session:info(id, Session), + ?tp(ps_resuming, #{from => db, sid => SessionID}), + + %% NOTE: Order is important! + + %% 1. Get pending messages from DB. + ?tp(ps_initial_pendings, #{sid => SessionID}), + Pendings1 = pending(SessionID), + Pendings2 = emqx_session:ignore_local(Pendings1, ClientID, Session), + ?tp(ps_got_initial_pendings, #{ sid => SessionID + , msgs => Pendings1}), + + %% 2. Enqueue messages to mimic that the process was alive + %% when the messages were delivered. + ?tp(ps_persist_pendings, #{sid => SessionID}), + Session1 = emqx_session:enqueue(Pendings2, Session), + Session2 = persist(ClientInfo, ConnInfo, Session1), + mark_as_delivered(SessionID, Pendings2), + ?tp(ps_persist_pendings_msgs, #{ msgs => Pendings2 + , sid => SessionID}), + + %% 3. Notify writers that we are resuming. + %% They will buffer new messages. + ?tp(ps_notify_writers, #{sid => SessionID}), + Nodes = mria_mnesia:running_nodes(), + NodeMarkers = resume_begin(Nodes, SessionID), + ?tp(ps_node_markers, #{sid => SessionID, markers => NodeMarkers}), + + %% 4. Subscribe to topics. + ?tp(ps_resume_session, #{sid => SessionID}), + ok = emqx_session:resume(ClientInfo, Session2), + + %% 5. Get pending messages from DB until we find all markers. + ?tp(ps_marker_pendings, #{sid => SessionID}), + MarkerIDs = [Marker || {_, Marker} <- NodeMarkers], + Pendings3 = pending(SessionID, MarkerIDs), + Pendings4 = emqx_session:ignore_local(Pendings3, ClientID, Session), + ?tp(ps_marker_pendings_msgs, #{ sid => SessionID + , msgs => Pendings4}), + + %% 6. Get pending messages from writers. + ?tp(ps_resume_end, #{sid => SessionID}), + WriterPendings = resume_end(Nodes, SessionID), + ?tp(ps_writer_pendings, #{ msgs => WriterPendings + , sid => SessionID}), + + %% 7. Drain the inbox and usort the messages + %% with the pending messages. (Should be done by caller.) + {Session2, Pendings4 ++ WriterPendings}. + +resume_begin(Nodes, SessionID) -> + Res = erpc:multicall(Nodes, emqx_session_router, resume_begin, [self(), SessionID]), + [{Node, Marker} || {{ok, {ok, Marker}}, Node} <- lists:zip(Res, Nodes)]. + +resume_end(Nodes, SessionID) -> + Res = erpc:multicall(Nodes, emqx_session_router, resume_end, [self(), SessionID]), + ?tp(ps_erpc_multical_result, #{ res => Res, sid => SessionID }), + %% TODO: Should handle the errors + [ {deliver, STopic, M} + || {ok, {ok, Messages}} <- Res, + {{M, STopic}} <- Messages + ]. + + +%%-------------------------------------------------------------------- +%% Messages API +%%-------------------------------------------------------------------- + +persist_message(Msg) -> + case is_store_enabled() of + true -> do_persist_message(Msg); + false -> ok + end. + +do_persist_message(Msg) -> + case emqx_message:get_flag(dup, Msg) orelse emqx_message:is_sys(Msg) of + true -> ok; + false -> + case emqx_session_router:match_routes(emqx_message:topic(Msg)) of + [] -> ok; + Routes -> + put_message(Msg), + MsgId = emqx_message:id(Msg), + persist_message_routes(Routes, MsgId, Msg) + end + end. + +persist_message_routes([#route{dest = SessionID, topic = STopic}|Left], MsgId, Msg) -> + ?tp(ps_persist_msg, #{sid => SessionID, payload => emqx_message:payload(Msg)}), + put_session_message({SessionID, MsgId, STopic, ?UNDELIVERED}), + emqx_session_router:buffer(SessionID, STopic, Msg), + persist_message_routes(Left, MsgId, Msg); +persist_message_routes([], _MsgId, _Msg) -> + ok. + +mark_as_delivered(SessionID, List) -> + case is_store_enabled() of + true -> do_mark_as_delivered(SessionID, List); + false -> ok + end. + +do_mark_as_delivered(SessionID, [{deliver, STopic, Msg}|Left]) -> + MsgID = emqx_message:id(Msg), + case next_session_message({SessionID, MsgID, STopic, ?ABANDONED}) of + {SessionID, MsgID, STopic, ?UNDELIVERED} = Key -> + %% We can safely delete this entry + %% instead of marking it as delivered. + delete_session_message(Key); + _ -> + put_session_message({SessionID, MsgID, STopic, ?DELIVERED}) + end, + do_mark_as_delivered(SessionID, Left); +do_mark_as_delivered(_SessionID, []) -> + ok. + +-spec pending(emqx_session:sessionID()) -> + [{emqx_types:message(), STopic :: binary()}]. +pending(SessionID) -> + pending_messages_in_db(SessionID, []). + +-spec pending(emqx_session:sessionID(), MarkerIDs :: [emqx_guid:guid()]) -> + [{emqx_types:message(), STopic :: binary()}]. +pending(SessionID, MarkerIds) -> + %% TODO: Handle lost MarkerIDs + case emqx_session_router:pending(SessionID, MarkerIds) of + incomplete -> + timer:sleep(10), + pending(SessionID, MarkerIds); + Delivers -> + Delivers + end. + +%%-------------------------------------------------------------------- +%% Session internal functions +%%-------------------------------------------------------------------- + +%% @private [MQTT-3.1.2-23] +persistent_session_status(#session_store{expiry_interval = 0}) -> + not_persistent; +persistent_session_status(#session_store{expiry_interval = ?MAX_EXPIRY_INTERVAL}) -> + persistent; +persistent_session_status(#session_store{expiry_interval = E, ts = TS}) -> + case E + TS > erlang:system_time(millisecond) of + true -> persistent; + false -> expired + end. + +%%-------------------------------------------------------------------- +%% Pending messages internal functions +%%-------------------------------------------------------------------- + +pending_messages_fun(SessionID, MarkerIds) -> + fun() -> + case pending_messages({SessionID, <<>>, <<>>, ?DELIVERED}, [], MarkerIds) of + {Pending, []} -> read_pending_msgs(Pending, []); + {_Pending, [_|_]} -> incomplete + end + end. + +read_pending_msgs([{MsgId, STopic}|Left], Acc) -> + Acc1 = try [{deliver, STopic, get_message(MsgId)}|Acc] + catch error:{msg_not_found, _} -> + HighwaterMark = erlang:system_time(microsecond) + - emqx_config:get(?msg_retain) * 1000, + case emqx_guid:timestamp(MsgId) < HighwaterMark of + true -> Acc; %% Probably cleaned by GC + false -> error({msg_not_found, MsgId}) + end + end, + read_pending_msgs(Left, Acc1); +read_pending_msgs([], Acc) -> + lists:reverse(Acc). + +%% The keys are ordered by +%% {sessionID(), <<>>, bin_timestamp(), ?ABANDONED} For abandoned sessions (clean started or expired). +%% {sessionID(), emqx_guid:guid(), STopic :: binary(), ?DELIVERED | ?UNDELIVERED | ?MARKER} +%% where +%% <<>> < emqx_guid:guid() +%% <<>> < bin_timestamp() +%% emqx_guid:guid() is ordered in ts() and by node() +%% ?ABANDONED < ?UNDELIVERED < ?DELIVERED < ?MARKER +%% +%% We traverse the table until we reach another session. +%% TODO: Garbage collect the delivered messages. +pending_messages({SessionID, PrevMsgId, PrevSTopic, PrevTag} = PrevKey, Acc, MarkerIds) -> + case next_session_message(PrevKey) of + {S, <<>>, _TS, ?ABANDONED} when S =:= SessionID -> + {[], []}; + {S, MsgId, <<>>, ?MARKER} = Key when S =:= SessionID -> + MarkerIds1 = MarkerIds -- [MsgId], + case PrevTag =:= ?UNDELIVERED of + false -> pending_messages(Key, Acc, MarkerIds1); + true -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds1) + end; + {S, MsgId, STopic, ?DELIVERED} = Key when S =:= SessionID, + MsgId =:= PrevMsgId, + STopic =:= PrevSTopic -> + pending_messages(Key, Acc, MarkerIds); + {S, _MsgId, _STopic, _Tag} = Key when S =:= SessionID -> + case PrevTag =:= ?UNDELIVERED of + false -> pending_messages(Key, Acc, MarkerIds); + true -> pending_messages(Key, [{PrevMsgId, PrevSTopic}|Acc], MarkerIds) + end; + _What -> %% Next sessionID or '$end_of_table' + case PrevTag =:= ?UNDELIVERED of + false -> {lists:reverse(Acc), MarkerIds}; + true -> {lists:reverse([{PrevMsgId, PrevSTopic}|Acc]), MarkerIds} + end + end. + +%%-------------------------------------------------------------------- +%% Garbage collection +%%-------------------------------------------------------------------- + +-spec gc_session_messages(gc_traverse_fun()) -> 'ok'. +gc_session_messages(Fun) -> + gc_traverse(first_session_message(), <<>>, false, Fun). + +gc_traverse('$end_of_table', _SessionID, _Abandoned, _Fun) -> + ok; +gc_traverse({S, <<>>, _TS, ?ABANDONED} = Key, _SessionID, _Abandoned, Fun) -> + %% Only report the abandoned session if it has no messages. + %% We want to keep the abandoned marker to last to make the GC reentrant. + case next_session_message(Key) of + '$end_of_table' = NextKey -> + ok = Fun(abandoned, Key), + gc_traverse(NextKey, S, true, Fun); + {S2, _, _, _} = NextKey when S =:= S2 -> + gc_traverse(NextKey, S, true, Fun); + {_, _, _, _} = NextKey -> + ok = Fun(abandoned, Key), + gc_traverse(NextKey, S, true, Fun) + end; +gc_traverse({S, _MsgID, <<>>, ?MARKER} = Key, SessionID, Abandoned, Fun) -> + ok = Fun(marker, Key), + NewAbandoned = S =:= SessionID andalso Abandoned, + gc_traverse(next_session_message(Key), S, NewAbandoned, Fun); +gc_traverse({S, _MsgID, _STopic, _Tag} = Key, SessionID, Abandoned, Fun) when Abandoned andalso + S =:= SessionID -> + %% Delete all messages from an abandoned session. + ok = Fun(delete, Key), + gc_traverse(next_session_message(Key), S, Abandoned, Fun); +gc_traverse({S, MsgID, STopic, ?UNDELIVERED} = Key, SessionID, Abandoned, Fun) -> + case next_session_message(Key) of + {S1, M, ST, ?DELIVERED} = NextKey when S1 =:= S andalso + MsgID =:= M andalso + STopic =:= ST -> + %% We have both markers for the same message/topic so it is safe to delete both. + ok = Fun(delete, Key), + ok = Fun(delete, NextKey), + gc_traverse(next_session_message(NextKey), S, Abandoned, Fun); + NextKey -> + %% Something else is here, so let's just loop. + NewAbandoned = S =:= SessionID andalso Abandoned, + gc_traverse(NextKey, SessionID, NewAbandoned, Fun) + end; +gc_traverse({S, _MsgID, _STopic, ?DELIVERED} = Key, SessionID, Abandoned, Fun) -> + %% We have a message that is marked as ?DELIVERED, but the ?UNDELIVERED is missing. + NewAbandoned = S =:= SessionID andalso Abandoned, + gc_traverse(next_session_message(Key), S, NewAbandoned, Fun). diff --git a/apps/emqx/src/emqx_persistent_session.hrl b/apps/emqx/src/emqx_persistent_session.hrl new file mode 100644 index 000000000..4cb51160a --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session.hrl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-define(SESSION_STORE, emqx_session_store). +-define(SESS_MSG_TAB, emqx_session_msg). +-define(MSG_TAB, emqx_persistent_msg). + +-record(session_store, { client_id :: binary() + , expiry_interval :: non_neg_integer() + , ts :: non_neg_integer() + , session :: emqx_session:session()}). + +-record(session_msg, {key :: emqx_persistent_session:sess_msg_key(), + val = [] :: []}). + +-define(db_backend_key, [persistent_session_store, db_backend]). +-define(is_enabled_key, [persistent_session_store, enabled]). +-define(msg_retain, [persistent_session_store, max_retain_undelivered]). + +-define(db_backend, (persistent_term:get(?db_backend_key))). diff --git a/apps/emqx/src/emqx_persistent_session_dummy_backend.erl b/apps/emqx/src/emqx_persistent_session_dummy_backend.erl new file mode 100644 index 000000000..d190a3066 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_dummy_backend.erl @@ -0,0 +1,76 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_dummy_backend). + +-include("emqx_persistent_session.hrl"). + +-export([ first_message_id/0 + , next_message_id/1 + , delete_message/1 + , first_session_message/0 + , next_session_message/1 + , delete_session_message/1 + , put_session_store/1 + , delete_session_store/1 + , lookup_session_store/1 + , put_session_message/1 + , put_message/1 + , get_message/1 + , ro_transaction/1 + ]). + +first_message_id() -> + '$end_of_table'. + +next_message_id(_) -> + '$end_of_table'. + +-spec delete_message(binary()) -> no_return(). +delete_message(_Key) -> + error(should_not_be_called). + +first_session_message() -> + '$end_of_table'. + +next_session_message(_Key) -> + '$end_of_table'. + +delete_session_message(_Key) -> + ok. + +put_session_store(#session_store{}) -> + ok. + +delete_session_store(_ClientID) -> + ok. + +lookup_session_store(_ClientID) -> + none. + +put_session_message({_, _, _, _}) -> + ok. + +put_message(_Msg) -> + ok. + +-spec get_message(binary()) -> no_return(). +get_message(_MsgId) -> + error(should_not_be_called). + +ro_transaction(Fun) -> + Fun(). + diff --git a/apps/emqx/src/emqx_persistent_session_gc.erl b/apps/emqx/src/emqx_persistent_session_gc.erl new file mode 100644 index 000000000..2fc95035c --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_gc.erl @@ -0,0 +1,152 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(emqx_persistent_session_gc). + +-behaviour(gen_server). + +-include("emqx_persistent_session.hrl"). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + ]). + +-ifdef(TEST). +-export([ session_gc_worker/2 + , message_gc_worker/0 + ]). +-endif. + +-define(SERVER, ?MODULE). +%% TODO: Maybe these should be configurable? +-define(MARKER_GRACE_PERIOD, 60000000). +-define(ABANDONED_GRACE_PERIOD, 300000000). + +%%-------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([]) -> + process_flag(trap_exit, true), + {ok, start_message_gc_timer(start_session_gc_timer(#{}))}. + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({timeout, Ref, session_gc_timeout}, State) -> + State1 = session_gc_timeout(Ref, State), + {noreply, State1}; +handle_info({timeout, Ref, message_gc_timeout}, State) -> + State1 = message_gc_timeout(Ref, State), + {noreply, State1}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +%%-------------------------------------------------------------------- +%% Session messages GC +%%-------------------------------------------------------------------- + +start_session_gc_timer(State) -> + Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]), + State#{ session_gc_timer => erlang:start_timer(Interval, self(), session_gc_timeout)}. + +session_gc_timeout(Ref, #{ session_gc_timer := R } = State) when R =:= Ref -> + %% Prevent overlapping processes. + GCPid = maps:get(session_gc_pid, State, undefined), + case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of + true -> start_session_gc_timer(State); + false -> start_session_gc_timer(State#{ session_gc_pid => proc_lib:spawn_link(fun session_gc_worker/0)}) + end; +session_gc_timeout(_Ref, State) -> + State. + +session_gc_worker() -> + ok = emqx_persistent_session:gc_session_messages(fun session_gc_worker/2). + +session_gc_worker(delete, Key) -> + emqx_persistent_session:delete_session_message(Key); +session_gc_worker(marker, Key) -> + TS = emqx_persistent_session:session_message_info(timestamp, Key), + case TS + ?MARKER_GRACE_PERIOD < erlang:system_time(microsecond) of + true -> emqx_persistent_session:delete_session_message(Key); + false -> ok + end; +session_gc_worker(abandoned, Key) -> + TS = emqx_persistent_session:session_message_info(timestamp, Key), + case TS + ?ABANDONED_GRACE_PERIOD < erlang:system_time(microsecond) of + true -> emqx_persistent_session:delete_session_message(Key); + false -> ok + end. + +%%-------------------------------------------------------------------- +%% Message GC +%% -------------------------------------------------------------------- +%% The message GC simply removes all messages older than the retain +%% period. A more exact GC would either involve treating the session +%% message table as root set, or some kind of reference counting. +%% We sacrifice space for simplicity at this point. +start_message_gc_timer(State) -> + Interval = emqx_config:get([persistent_session_store, session_message_gc_interval]), + State#{ message_gc_timer => erlang:start_timer(Interval, self(), message_gc_timeout)}. + +message_gc_timeout(Ref, #{ message_gc_timer := R } = State) when R =:= Ref -> + %% Prevent overlapping processes. + GCPid = maps:get(message_gc_pid, State, undefined), + case GCPid =/= undefined andalso erlang:is_process_alive(GCPid) of + true -> start_message_gc_timer(State); + false -> start_message_gc_timer(State#{ message_gc_pid => proc_lib:spawn_link(fun message_gc_worker/0)}) + end; +message_gc_timeout(_Ref, State) -> + State. + +message_gc_worker() -> + HighWaterMark = erlang:system_time(microsecond) - emqx_config:get(?msg_retain) * 1000, + message_gc_worker(emqx_persistent_session:first_message_id(), HighWaterMark). + +message_gc_worker('$end_of_table', _HighWaterMark) -> + ok; +message_gc_worker(MsgId, HighWaterMark) -> + case emqx_guid:timestamp(MsgId) < HighWaterMark of + true -> + emqx_persistent_session:delete_message(MsgId), + message_gc_worker(emqx_persistent_session:next_message_id(MsgId), HighWaterMark); + false -> + ok + end. diff --git a/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl b/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl new file mode 100644 index 000000000..512984845 --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_mnesia_backend.erl @@ -0,0 +1,110 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_mnesia_backend). + +-include("emqx.hrl"). +-include("emqx_persistent_session.hrl"). + +-export([ create_tables/0 + , first_message_id/0 + , next_message_id/1 + , delete_message/1 + , first_session_message/0 + , next_session_message/1 + , delete_session_message/1 + , put_session_store/1 + , delete_session_store/1 + , lookup_session_store/1 + , put_session_message/1 + , put_message/1 + , get_message/1 + , ro_transaction/1 + ]). + +create_tables() -> + ok = mria:create_table(?SESSION_STORE, [ + {type, set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, session_store}, + {attributes, record_info(fields, session_store)}, + {storage_properties, [{ets, [{read_concurrency, true}]}]}]), + + ok = mria:create_table(?SESS_MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, session_msg}, + {attributes, record_info(fields, session_msg)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]), + + ok = mria:create_table(?MSG_TAB, [ + {type, ordered_set}, + {rlog_shard, ?PERSISTENT_SESSION_SHARD}, + {storage, disc_copies}, + {record_name, message}, + {attributes, record_info(fields, message)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]). + +first_session_message() -> + mnesia:dirty_first(?SESS_MSG_TAB). + +next_session_message(Key) -> + mnesia:dirty_next(?SESS_MSG_TAB, Key). + +first_message_id() -> + mnesia:dirty_first(?MSG_TAB). + +next_message_id(Key) -> + mnesia:dirty_next(?MSG_TAB, Key). + +delete_message(Key) -> + mria:dirty_delete(?MSG_TAB, Key). + +delete_session_message(Key) -> + mria:dirty_delete(?SESS_MSG_TAB, Key). + +put_session_store(SS) -> + mria:dirty_write(?SESSION_STORE, SS). + +delete_session_store(ClientID) -> + mria:dirty_delete(?SESSION_STORE, ClientID). + +lookup_session_store(ClientID) -> + case mnesia:dirty_read(?SESSION_STORE, ClientID) of + [] -> none; + [SS] -> {value, SS} + end. + +put_session_message(SessMsg) -> + mria:dirty_write(?SESS_MSG_TAB, SessMsg). + +put_message(Msg) -> + mria:dirty_write(?MSG_TAB, Msg). + +get_message(MsgId) -> + case mnesia:read(?MSG_TAB, MsgId) of + [] -> error({msg_not_found, MsgId}); + [Msg] -> Msg + end. + +ro_transaction(Fun) -> + {atomic, Res} = mria:ro_transaction(?PERSISTENT_SESSION_SHARD, Fun), + Res. + diff --git a/apps/emqx/src/emqx_persistent_session_sup.erl b/apps/emqx/src/emqx_persistent_session_sup.erl new file mode 100644 index 000000000..1a1c5df5f --- /dev/null +++ b/apps/emqx/src/emqx_persistent_session_sup.erl @@ -0,0 +1,60 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_sup). + +-behaviour(supervisor). + +-export([start_link/0]). + +-export([init/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + %% We want this supervisor to own the table for restarts + SessionTab = emqx_session_router:create_init_tab(), + + %% Resume worker sup + ResumeSup = #{id => router_worker_sup, + start => {emqx_session_router_worker_sup, start_link, [SessionTab]}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => [emqx_session_router_worker_sup]}, + + SessionRouterPool = emqx_pool_sup:spec(session_router_pool, + [session_router_pool, hash, + {emqx_session_router, start_link, []}]), + + GCWorker = child_spec(emqx_persistent_session_gc, worker), + + Spec = #{ strategy => one_for_all + , intensity => 0 + , period => 1 + }, + + {ok, {Spec, [ResumeSup, SessionRouterPool, GCWorker]}}. + +child_spec(Mod, worker) -> + #{id => Mod, + start => {Mod, start_link, []}, + restart => permanent, + shutdown => 15000, + type => worker, + modules => [Mod] + }. diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 3337b57c8..81d3786e3 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -116,8 +116,10 @@ do_add_route(Topic, Dest) when is_binary(Topic) -> ok = emqx_router_helper:monitor(Dest), case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun insert_trie_route/1, [Route]); - false -> insert_direct_route(Route) + Fun = fun emqx_router_utils:insert_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) end end. @@ -162,8 +164,10 @@ do_delete_route(Topic, Dest) -> Route = #route{topic = Topic, dest = Dest}, case emqx_topic:wildcard(Topic) of true -> - maybe_trans(fun delete_trie_route/1, [Route]); - false -> delete_direct_route(Route) + Fun = fun emqx_router_utils:delete_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?ROUTE_SHARD); + false -> + emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) end. -spec(topics() -> list(emqx_types:topic())). @@ -216,100 +220,3 @@ terminate(_Reason, #{pool := Pool, id := Id}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. - -%%-------------------------------------------------------------------- -%% Internal functions -%%-------------------------------------------------------------------- - -insert_direct_route(Route) -> - mria:dirty_write(?ROUTE_TAB, Route). - -insert_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [] -> emqx_trie:insert(Topic); - _ -> ok - end, - mnesia:write(?ROUTE_TAB, Route, sticky_write). - -delete_direct_route(Route) -> - mria:dirty_delete_object(?ROUTE_TAB, Route). - -delete_trie_route(Route = #route{topic = Topic}) -> - case mnesia:wread({?ROUTE_TAB, Topic}) of - [Route] -> %% Remove route and trie - ok = mnesia:delete_object(?ROUTE_TAB, Route, sticky_write), - emqx_trie:delete(Topic); - [_|_] -> %% Remove route only - mnesia:delete_object(?ROUTE_TAB, Route, sticky_write); - [] -> ok - end. - -%% @private --spec(maybe_trans(function(), list(any())) -> ok | {error, term()}). -maybe_trans(Fun, Args) -> - case emqx:get_config([broker, perf, route_lock_type]) of - key -> - trans(Fun, Args); - global -> - %% Assert: - mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash - lock_router(), - try mnesia:sync_dirty(Fun, Args) - after - unlock_router() - end; - tab -> - trans(fun() -> - emqx_trie:lock_tables(), - apply(Fun, Args) - end, []) - end. - -%% The created fun only terminates with explicit exception --dialyzer({nowarn_function, [trans/2]}). - --spec(trans(function(), list(any())) -> ok | {error, term()}). -trans(Fun, Args) -> - {WPid, RefMon} = - spawn_monitor( - %% NOTE: this is under the assumption that crashes in Fun - %% are caught by mria:transaction/2. - %% Future changes should keep in mind that this process - %% always exit with database write result. - fun() -> - Res = case mria:transaction(?ROUTE_SHARD, Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} - end, - exit({shutdown, Res}) - end), - %% Receive a 'shutdown' exit to pass result from the short-lived process. - %% so the receive below can be receive-mark optimized by the compiler. - %% - %% If the result is sent as a regular message, we'll have to - %% either demonitor (with flush which is essentially a 'receive' since - %% the process is no longer alive after the result has been received), - %% or use a plain 'receive' to drain the normal 'DOWN' message. - %% However the compiler does not optimize this second 'receive'. - receive - {'DOWN', RefMon, process, WPid, Info} -> - case Info of - {shutdown, Result} -> Result; - _ -> {error, {trans_crash, Info}} - end - end. - -lock_router() -> - %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. - %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. - case global:set_lock({?MODULE, self()}, [node() | nodes()], 0) of - false -> - %% Force to sleep 1ms instead. - timer:sleep(1), - lock_router(); - true -> - ok - end. - -unlock_router() -> - global:del_lock({?MODULE, self()}). diff --git a/apps/emqx/src/emqx_router_utils.erl b/apps/emqx/src/emqx_router_utils.erl new file mode 100644 index 000000000..c47f2e37b --- /dev/null +++ b/apps/emqx/src/emqx_router_utils.erl @@ -0,0 +1,126 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_router_utils). + +-include("emqx.hrl"). + +-export([ delete_direct_route/2 + , delete_trie_route/2 + , insert_direct_route/2 + , insert_trie_route/2 + , maybe_trans/3 + ]). + +insert_direct_route(Tab, Route) -> + mria:dirty_write(Tab, Route). + +insert_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [] when RouteTab =:= emqx_route -> emqx_trie:insert(Topic); + [] when RouteTab =:= emqx_session_route -> emqx_trie:insert_session(Topic); + _ -> ok + end, + mnesia:write(RouteTab, Route, sticky_write). + +delete_direct_route(RouteTab, Route) -> + mria:dirty_delete_object(RouteTab, Route). + +delete_trie_route(RouteTab, Route = #route{topic = Topic}) -> + case mnesia:wread({RouteTab, Topic}) of + [R] when R =:= Route -> + %% Remove route and trie + ok = mnesia:delete_object(RouteTab, Route, sticky_write), + case RouteTab of + emqx_route -> emqx_trie:delete(Topic); + emqx_session_route -> emqx_trie:delete_session(Topic) + end; + [_|_] -> + %% Remove route only + mnesia:delete_object(RouteTab, Route, sticky_write); + [] -> + ok + end. + +%% @private +-spec(maybe_trans(function(), list(any()), Shard :: atom()) -> ok | {error, term()}). +maybe_trans(Fun, Args, Shard) -> + case emqx:get_config([broker, perf, route_lock_type]) of + key -> + trans(Fun, Args, Shard); + global -> + %% Assert: + mnesia = mria_rlog:backend(), %% TODO: do something smarter than just crash + lock_router(Shard), + try mnesia:sync_dirty(Fun, Args) + after + unlock_router(Shard) + end; + tab -> + trans(fun() -> + emqx_trie:lock_tables(), + apply(Fun, Args) + end, [], Shard) + end. + +%% The created fun only terminates with explicit exception +-dialyzer({nowarn_function, [trans/3]}). + +-spec(trans(function(), list(any()), atom()) -> ok | {error, term()}). +trans(Fun, Args, Shard) -> + {WPid, RefMon} = + spawn_monitor( + %% NOTE: this is under the assumption that crashes in Fun + %% are caught by mnesia:transaction/2. + %% Future changes should keep in mind that this process + %% always exit with database write result. + fun() -> + Res = case mria:transaction(Shard, Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end, + exit({shutdown, Res}) + end), + %% Receive a 'shutdown' exit to pass result from the short-lived process. + %% so the receive below can be receive-mark optimized by the compiler. + %% + %% If the result is sent as a regular message, we'll have to + %% either demonitor (with flush which is essentially a 'receive' since + %% the process is no longer alive after the result has been received), + %% or use a plain 'receive' to drain the normal 'DOWN' message. + %% However the compiler does not optimize this second 'receive'. + receive + {'DOWN', RefMon, process, WPid, Info} -> + case Info of + {shutdown, Result} -> Result; + _ -> {error, {trans_crash, Info}} + end + end. + +lock_router(Shard) -> + %% if Retry is not 0, global:set_lock could sleep a random time up to 8s. + %% Considering we have a limited number of brokers, it is safe to use sleep 1 ms. + case global:set_lock({{?MODULE, Shard}, self()}, [node() | nodes()], 0) of + false -> + %% Force to sleep 1ms instead. + timer:sleep(1), + lock_router(Shard); + true -> + ok + end. + +unlock_router(Shard) -> + global:del_lock({{?MODULE, Shard}, self()}). diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 2d71a1d69..99a7ed59b 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -150,8 +150,30 @@ roots(low) -> , {"flapping_detect", sc(ref("flapping_detect"), #{})} + , {"persistent_session_store", + sc(ref("persistent_session_store"), + #{})} ]. +fields("persistent_session_store") -> + [ {"enabled", + sc(boolean(), + #{ default => "false" + })}, + {"max_retain_undelivered", + sc(duration(), + #{ default => "1h" + })}, + {"message_gc_interval", + sc(duration(), + #{ default => "1h" + })}, + {"session_message_gc_interval", + sc(duration(), + #{ default => "1m" + })} + ]; + fields("stats") -> [ {"enable", sc(boolean(), diff --git a/apps/emqx/src/emqx_session.erl b/apps/emqx/src/emqx_session.erl index 408435006..03414fc60 100644 --- a/apps/emqx/src/emqx_session.erl +++ b/apps/emqx/src/emqx_session.erl @@ -75,6 +75,7 @@ -export([ deliver/2 , enqueue/2 , dequeue/1 + , ignore_local/3 , retry/1 , terminate/3 ]). @@ -89,9 +90,17 @@ %% Export for CT -export([set_field/3]). --export_type([session/0]). +-type sessionID() :: emqx_guid:guid(). + +-export_type([ session/0 + , sessionID/0 + ]). -record(session, { + %% sessionID, fresh for all new sessions unless it is a resumed persistent session + id :: sessionID(), + %% Is this session a persistent session i.e. was it started with Session-Expiry > 0 + is_persistent :: boolean(), %% Client’s Subscriptions. subscriptions :: map(), %% Max subscriptions allowed @@ -129,7 +138,9 @@ -type(replies() :: list(publish() | pubrel())). --define(INFO_KEYS, [subscriptions, +-define(INFO_KEYS, [id, + is_persistent, + subscriptions, upgrade_qos, retry_interval, await_rel_timeout, @@ -157,6 +168,7 @@ , await_rel_timeout => timeout() , max_inflight => integer() , mqueue => emqx_mqueue:options() + , is_persistent => boolean() }. %%-------------------------------------------------------------------- @@ -171,6 +183,8 @@ init(Opts) -> store_qos0 => true }, maps:get(mqueue, Opts, #{})), #session{ + id = emqx_guid:gen(), + is_persistent = maps:get(is_persistent, Opts, false), max_subscriptions = maps:get(max_subscriptions, Opts, infinity), subscriptions = #{}, upgrade_qos = maps:get(upgrade_qos, Opts, false), @@ -195,6 +209,10 @@ info(Session) -> info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; +info(id, #session{id = Id}) -> + Id; +info(is_persistent, #session{is_persistent = Bool}) -> + Bool; info(subscriptions, #session{subscriptions = Subs}) -> Subs; info(subscriptions_cnt, #session{subscriptions = Subs}) -> @@ -236,6 +254,23 @@ info(created_at, #session{created_at = CreatedAt}) -> -spec(stats(session()) -> emqx_types:stats()). stats(Session) -> info(?STATS_KEYS, Session). +%%-------------------------------------------------------------------- +%% Ignore local messages +%%-------------------------------------------------------------------- + +ignore_local(Delivers, Subscriber, Session) -> + Subs = info(subscriptions, Session), + lists:dropwhile(fun({deliver, Topic, #message{from = Publisher}}) -> + case maps:find(Topic, Subs) of + {ok, #{nl := 1}} when Subscriber =:= Publisher -> + ok = emqx_metrics:inc('delivery.dropped'), + ok = emqx_metrics:inc('delivery.dropped.no_local'), + true; + _ -> + false + end + end, Delivers). + %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- @@ -244,11 +279,12 @@ stats(Session) -> info(?STATS_KEYS, Session). emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, - Session = #session{subscriptions = Subs}) -> + Session = #session{id = SessionID, is_persistent = IsPS, subscriptions = Subs}) -> IsNew = not maps:is_key(TopicFilter, Subs), case IsNew andalso is_subscriptions_full(Session) of false -> ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts), + ok = emqx_persistent_session:add_subscription(TopicFilter, SessionID, IsPS), ok = emqx_hooks:run('session.subscribed', [ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]), {ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}; @@ -268,10 +304,12 @@ is_subscriptions_full(#session{subscriptions = Subs, -spec(unsubscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). -unsubscribe(ClientInfo, TopicFilter, UnSubOpts, Session = #session{subscriptions = Subs}) -> +unsubscribe(ClientInfo, TopicFilter, UnSubOpts, + Session = #session{id = SessionID, subscriptions = Subs, is_persistent = IsPS}) -> case maps:find(TopicFilter, Subs) of {ok, SubOpts} -> ok = emqx_broker:unsubscribe(TopicFilter), + ok = emqx_persistent_session:remove_subscription(TopicFilter, SessionID, IsPS), ok = emqx_hooks:run('session.unsubscribed', [ClientInfo, TopicFilter, maps:merge(SubOpts, UnSubOpts)]), {ok, Session#session{subscriptions = maps:remove(TopicFilter, Subs)}}; error -> diff --git a/apps/emqx/src/emqx_session_router.erl b/apps/emqx/src/emqx_session_router.erl new file mode 100644 index 000000000..45c7ecd85 --- /dev/null +++ b/apps/emqx/src/emqx_session_router.erl @@ -0,0 +1,276 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_session_router). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("logger.hrl"). +-include("types.hrl"). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%% Mnesia bootstrap +-export([mnesia/1]). + +-boot_mnesia({mnesia, [boot]}). + +-export([ create_init_tab/0 + , start_link/2]). + +%% Route APIs +-export([ delete_routes/2 + , do_add_route/2 + , do_delete_route/2 + , match_routes/1 + ]). + +-export([ buffer/3 + , pending/2 + , resume_begin/2 + , resume_end/2 + ]). + +-export([print_routes/1]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + , code_change/3 + ]). + +-type(group() :: binary()). + +-type(dest() :: node() | {group(), node()}). + +-define(ROUTE_TAB, emqx_session_route). + +-define(SESSION_INIT_TAB, session_init_tab). + +%%-------------------------------------------------------------------- +%% Mnesia bootstrap +%%-------------------------------------------------------------------- + +mnesia(boot) -> + ok = mria:create_table(?ROUTE_TAB, [ + {type, bag}, + {rlog_shard, ?ROUTE_SHARD}, + {storage, disc_copies}, + {record_name, route}, + {attributes, record_info(fields, route)}, + {storage_properties, [{ets, [{read_concurrency, true}, + {write_concurrency, true}]}]}]). + +%%-------------------------------------------------------------------- +%% Start a router +%%-------------------------------------------------------------------- + +create_init_tab() -> + emqx_tables:new(?SESSION_INIT_TAB, [public, {read_concurrency, true}, + {write_concurrency, true}]). + +-spec(start_link(atom(), pos_integer()) -> startlink_ret()). +start_link(Pool, Id) -> + gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, + ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). + +%%-------------------------------------------------------------------- +%% Route APIs +%%-------------------------------------------------------------------- + +-spec(do_add_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). +do_add_route(Topic, SessionID) when is_binary(Topic) -> + Route = #route{topic = Topic, dest = SessionID}, + case lists:member(Route, lookup_routes(Topic)) of + true -> ok; + false -> + case emqx_topic:wildcard(Topic) of + true -> + Fun = fun emqx_router_utils:insert_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], + ?PERSISTENT_SESSION_SHARD); + false -> + emqx_router_utils:insert_direct_route(?ROUTE_TAB, Route) + end + end. + +%% @doc Match routes +-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]). +match_routes(Topic) when is_binary(Topic) -> + case match_trie(Topic) of + [] -> lookup_routes(Topic); + Matched -> + lists:append([lookup_routes(To) || To <- [Topic | Matched]]) + end. + +%% Optimize: routing table will be replicated to all router nodes. +match_trie(Topic) -> + case emqx_trie:empty_session() of + true -> []; + false -> emqx_trie:match_session(Topic) + end. + +%% Async +delete_routes(SessionID, Subscriptions) -> + cast(pick(SessionID), {delete_routes, SessionID, Subscriptions}). + +-spec(do_delete_route(emqx_topic:topic(), dest()) -> ok | {error, term()}). +do_delete_route(Topic, SessionID) -> + Route = #route{topic = Topic, dest = SessionID}, + case emqx_topic:wildcard(Topic) of + true -> + Fun = fun emqx_router_utils:delete_trie_route/2, + emqx_router_utils:maybe_trans(Fun, [?ROUTE_TAB, Route], ?PERSISTENT_SESSION_SHARD); + false -> + emqx_router_utils:delete_direct_route(?ROUTE_TAB, Route) + end. + +%% @doc Print routes to a topic +-spec(print_routes(emqx_topic:topic()) -> ok). +print_routes(Topic) -> + lists:foreach(fun(#route{topic = To, dest = SessionID}) -> + io:format("~s -> ~p~n", [To, SessionID]) + end, match_routes(Topic)). + +%%-------------------------------------------------------------------- +%% Session APIs +%%-------------------------------------------------------------------- + +pending(SessionID, MarkerIDs) -> + call(pick(SessionID), {pending, SessionID, MarkerIDs}). + +buffer(SessionID, STopic, Msg) -> + case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + undefined -> ok; + Worker -> emqx_session_router_worker:buffer(Worker, STopic, Msg) + end. + +-spec resume_begin(pid(), binary()) -> [{node(), emqx_guid:guid()}]. +resume_begin(From, SessionID) when is_pid(From), is_binary(SessionID) -> + call(pick(SessionID), {resume_begin, From, SessionID}). + +-spec resume_end(pid(), binary()) -> + {'ok', [emqx_types:message()]} | {'error', term()}. +resume_end(From, SessionID) when is_pid(From), is_binary(SessionID) -> + case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + undefined -> + ?tp(ps_session_not_found, #{ sid => SessionID }), + {error, not_found}; + Pid -> + Res = emqx_session_router_worker:resume_end(From, Pid, SessionID), + cast(pick(SessionID), {resume_end, SessionID, Pid}), + Res + end. + +%%-------------------------------------------------------------------- +%% Worker internals +%%-------------------------------------------------------------------- + +call(Router, Msg) -> + gen_server:call(Router, Msg, infinity). + +cast(Router, Msg) -> + gen_server:cast(Router, Msg). + +pick(#route{dest = SessionID}) -> + gproc_pool:pick_worker(session_router_pool, SessionID); +pick(SessionID) when is_binary(SessionID) -> + gproc_pool:pick_worker(session_router_pool, SessionID). + +%%-------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------- + +init([Pool, Id]) -> + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{pool => Pool, id => Id, pmon => emqx_pmon:new()}}. + +handle_call({resume_begin, RemotePid, SessionID}, _From, State) -> + case init_resume_worker(RemotePid, SessionID, State) of + error -> + {reply, error, State}; + {ok, Pid, State1} -> + ets:insert(?SESSION_INIT_TAB, {SessionID, Pid}), + MarkerID = emqx_persistent_session:mark_resume_begin(SessionID), + {reply, {ok, MarkerID}, State1} + end; +handle_call({pending, SessionID, MarkerIDs}, _From, State) -> + Res = emqx_persistent_session:pending_messages_in_db(SessionID, MarkerIDs), + {reply, Res, State}; +handle_call(Req, _From, State) -> + ?LOG(error, "Unexpected call: ~p", [Req]), + {reply, ignored, State}. + +handle_cast({delete_routes, SessionID, Subscriptions}, State) -> + %% TODO: Make a batch for deleting all routes. + Fun = fun({Topic, _}) -> do_delete_route(Topic, SessionID) end, + ok = lists:foreach(Fun, maps:to_list(Subscriptions)), + {noreply, State}; +handle_cast({resume_end, SessionID, Pid}, State) -> + case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + undefined -> skip; + P when P =:= Pid -> ets:delete(?SESSION_INIT_TAB, SessionID); + P when is_pid(P) -> skip + end, + Pmon = emqx_pmon:demonitor(Pid, maps:get(pmon, State)), + _ = emqx_session_router_worker_sup:abort_worker(Pid), + {noreply, State#{ pmon => Pmon }}; +handle_cast(Msg, State) -> + ?LOG(error, "Unexpected cast: ~p", [Msg]), + {noreply, State}. + +handle_info(Info, State) -> + ?LOG(error, "Unexpected info: ~p", [Info]), + {noreply, State}. + +terminate(_Reason, #{pool := Pool, id := Id}) -> + gproc_pool:disconnect_worker(Pool, {Pool, Id}). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%% Resume worker. A process that buffers the persisted messages during +%% initialisation of a resuming session. +%%-------------------------------------------------------------------- + +init_resume_worker(RemotePid, SessionID, #{ pmon := Pmon } = State) -> + case emqx_session_router_worker_sup:start_worker(SessionID, RemotePid) of + {error, What} -> + ?SLOG(error, #{msg => "Could not start resume worker", reason => What}), + error; + {ok, Pid} -> + Pmon1 = emqx_pmon:monitor(Pid, Pmon), + case emqx_tables:lookup_value(?SESSION_INIT_TAB, SessionID) of + undefined -> + {ok, Pid, State#{ pmon => Pmon1 }}; + {_, OldPid} -> + Pmon2 = emqx_pmon:demonitor(OldPid, Pmon1), + _ = emqx_session_router_worker_sup:abort_worker(OldPid), + {ok, Pid, State#{ pmon => Pmon2 }} + end + end. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +lookup_routes(Topic) -> + ets:lookup(?ROUTE_TAB, Topic). diff --git a/apps/emqx/src/emqx_session_router_worker.erl b/apps/emqx/src/emqx_session_router_worker.erl new file mode 100644 index 000000000..322f110c0 --- /dev/null +++ b/apps/emqx/src/emqx_session_router_worker.erl @@ -0,0 +1,148 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc The session router worker is responsible for buffering +%% messages for a persistent session while it is initializing. If a +%% connection process exists for a persistent session, this process is +%% used for bridging the gap while the new connection process takes +%% over the persistent session, but if there is no such process this +%% worker takes it place. +%% +%% The workers are started on all nodes, and buffers all messages that +%% are persisted to the session message table. In the final stage of +%% the initialization, the messages are delivered and the worker is +%% terminated. + + +-module(emqx_session_router_worker). + +-behaviour(gen_server). + +%% API +-export([ buffer/3 + , pendings/1 + , resume_end/3 + , start_link/2 + ]). + +%% gen_server callbacks +-export([ init/1 + , handle_call/3 + , handle_cast/2 + , handle_info/2 + , terminate/2 + ]). + +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-record(state, { remote_pid :: pid() + , session_id :: binary() + , session_tab :: ets:table() + , messages :: ets:table() + , buffering :: boolean() + }). + +%%%=================================================================== +%%% API +%%%=================================================================== + +start_link(SessionTab, #{} = Opts) -> + gen_server:start_link(?MODULE, Opts#{ session_tab => SessionTab}, []). + +pendings(Pid) -> + gen_server:call(Pid, pendings). + +resume_end(RemotePid, Pid, _SessionID) -> + case gen_server:call(Pid, {resume_end, RemotePid}) of + {ok, EtsHandle} -> + ?tp(ps_worker_call_ok, #{ pid => Pid + , remote_pid => RemotePid + , sid => _SessionID}), + {ok, ets:tab2list(EtsHandle)}; + {error, _} = Err -> + ?tp(ps_worker_call_failed, #{ pid => Pid + , remote_pid => RemotePid + , sid => _SessionID + , reason => Err}), + Err + end. + +buffer(Worker, STopic, Msg) -> + Worker ! {buffer, STopic, Msg}, + ok. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init(#{ remote_pid := RemotePid + , session_id := SessionID + , session_tab := SessionTab}) -> + process_flag(trap_exit, true), + erlang:monitor(process, RemotePid), + ?tp(ps_worker_started, #{ remote_pid => RemotePid + , sid => SessionID }), + {ok, #state{ remote_pid = RemotePid + , session_id = SessionID + , session_tab = SessionTab + , messages = ets:new(?MODULE, [protected, ordered_set]) + , buffering = true + }}. + +handle_call(pendings, _From, State) -> + %% Debug API + {reply, {State#state.messages, State#state.remote_pid}, State}; +handle_call({resume_end, RemotePid}, _From, #state{remote_pid = RemotePid} = State) -> + ?tp(ps_worker_resume_end, #{sid => State#state.session_id}), + {reply, {ok, State#state.messages}, State#state{ buffering = false }}; +handle_call({resume_end, _RemotePid}, _From, State) -> + ?tp(ps_worker_resume_end_error, #{sid => State#state.session_id}), + {reply, {error, wrong_remote_pid}, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({buffer, _STopic, _Msg}, State) when not State#state.buffering -> + ?tp(ps_worker_drop_deliver, #{ sid => State#state.session_id + , msg_id => emqx_message:id(_Msg) + }), + {noreply, State}; +handle_info({buffer, STopic, Msg}, State) when State#state.buffering -> + ?tp(ps_worker_deliver, #{ sid => State#state.session_id + , msg_id => emqx_message:id(Msg) + }), + ets:insert(State#state.messages, {{Msg, STopic}}), + {noreply, State}; +handle_info({'DOWN', _, process, RemotePid, _Reason}, #state{remote_pid = RemotePid} = State) -> + ?tp(warning, ps_worker, #{ event => worker_remote_died + , sid => State#state.session_id + , msg => "Remote pid died. Exiting." }), + {stop, normal, State}; +handle_info(_Info, State) -> + {noreply, State}. + +terminate(shutdown, _State) -> + ?tp(ps_worker_shutdown, #{ sid => _State#state.session_id }), + ok; +terminate(_, _State) -> + ok. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/emqx/src/emqx_session_router_worker_sup.erl b/apps/emqx/src/emqx_session_router_worker_sup.erl new file mode 100644 index 000000000..08e9028e0 --- /dev/null +++ b/apps/emqx/src/emqx_session_router_worker_sup.erl @@ -0,0 +1,57 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_session_router_worker_sup). + +-behaviour(supervisor). + +-export([ start_link/1 + ]). + +-export([ abort_worker/1 + , start_worker/2 + ]). + +-export([ init/1 + ]). + +start_link(SessionTab) -> + supervisor:start_link({local, ?MODULE}, ?MODULE, SessionTab). + +start_worker(SessionID, RemotePid) -> + supervisor:start_child(?MODULE, [#{ session_id => SessionID + , remote_pid => RemotePid}]). + +abort_worker(Pid) -> + supervisor:terminate_child(?MODULE, Pid). + +%%-------------------------------------------------------------------- +%% Supervisor callbacks +%%-------------------------------------------------------------------- + +init(SessionTab) -> + %% Resume worker + Worker = #{id => session_router_worker, + start => {emqx_session_router_worker, start_link, [SessionTab]}, + restart => transient, + shutdown => 2000, + type => worker, + modules => [emqx_session_router_worker]}, + Spec = #{ strategy => simple_one_for_one + , intensity => 1 + , period => 5}, + + {ok, {Spec, [Worker]}}. diff --git a/apps/emqx/src/emqx_sup.erl b/apps/emqx/src/emqx_sup.erl index 9338b7390..9c8af33d6 100644 --- a/apps/emqx/src/emqx_sup.erl +++ b/apps/emqx/src/emqx_sup.erl @@ -65,9 +65,11 @@ init([]) -> KernelSup = child_spec(emqx_kernel_sup, supervisor), RouterSup = child_spec(emqx_router_sup, supervisor), BrokerSup = child_spec(emqx_broker_sup, supervisor), + SessionSup = child_spec(emqx_persistent_session_sup, supervisor), CMSup = child_spec(emqx_cm_sup, supervisor), SysSup = child_spec(emqx_sys_sup, supervisor), Children = [KernelSup] ++ + [SessionSup || emqx_persistent_session:is_store_enabled()] ++ [RouterSup || emqx_boot:is_enabled(router)] ++ [BrokerSup || emqx_boot:is_enabled(broker)] ++ [CMSup || emqx_boot:is_enabled(broker)] ++ diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index 4354a2bab..1881ecb6b 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -19,18 +19,25 @@ -include("emqx.hrl"). %% Mnesia bootstrap --export([mnesia/1]). +-export([ mnesia/1 + , create_session_trie/0 + ]). -boot_mnesia({mnesia, [boot]}). %% Trie APIs -export([ insert/1 + , insert_session/1 , match/1 + , match_session/1 , delete/1 + , delete_session/1 ]). -export([ empty/0 + , empty_session/0 , lock_tables/0 + , lock_session_tables/0 ]). -export([is_compact/0, set_compact/1]). @@ -41,6 +48,7 @@ -endif. -define(TRIE, emqx_trie). +-define(SESSION_TRIE, emqx_session_trie). -define(PREFIX(Prefix), {Prefix, 0}). -define(TOPIC(Topic), {Topic, 1}). @@ -62,12 +70,23 @@ mnesia(boot) -> ]}], ok = mria:create_table(?TRIE, [ {rlog_shard, ?ROUTE_SHARD}, - {storage, ram_copies}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, {type, ordered_set}, {storage_properties, StoreProps}]). +create_session_trie() -> + StoreProps = [{ets, [{read_concurrency, true}, + {write_concurrency, true} + ]}], + ok = mria:create_table(?SESSION_TRIE, + [{rlog_shard, ?ROUTE_SHARD}, + {storage, disc_copies}, + {record_name, ?TRIE}, + {attributes, record_info(fields, ?TRIE)}, + {type, ordered_set}, + {storage_properties, StoreProps}]). + %%-------------------------------------------------------------------- %% Topics APIs %%-------------------------------------------------------------------- @@ -75,24 +94,46 @@ mnesia(boot) -> %% @doc Insert a topic filter into the trie. -spec(insert(emqx_types:topic()) -> ok). insert(Topic) when is_binary(Topic) -> + insert(Topic, ?TRIE). + +-spec(insert_session(emqx_topic:topic()) -> ok). +insert_session(Topic) when is_binary(Topic) -> + insert(Topic, ?SESSION_TRIE). + +insert(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case mnesia:wread({?TRIE, TopicKey}) of + case mnesia:wread({Trie, TopicKey}) of [_] -> ok; %% already inserted - [] -> lists:foreach(fun insert_key/1, [TopicKey | PrefixKeys]) + [] -> lists:foreach(fun(Key) -> insert_key(Key, Trie) end, [TopicKey | PrefixKeys]) end. %% @doc Delete a topic filter from the trie. -spec(delete(emqx_types:topic()) -> ok). delete(Topic) when is_binary(Topic) -> + delete(Topic, ?TRIE). + +%% @doc Delete a topic filter from the trie. +-spec(delete_session(emqx_topic:topic()) -> ok). +delete_session(Topic) when is_binary(Topic) -> + delete(Topic, ?SESSION_TRIE). + +delete(Topic, Trie) when is_binary(Topic) -> {TopicKey, PrefixKeys} = make_keys(Topic), - case [] =/= mnesia:wread({?TRIE, TopicKey}) of - true -> lists:foreach(fun delete_key/1, [TopicKey | PrefixKeys]); + case [] =/= mnesia:wread({Trie, TopicKey}) of + true -> lists:foreach(fun(Key) -> delete_key(Key, Trie) end, [TopicKey | PrefixKeys]); false -> ok end. %% @doc Find trie nodes that matchs the topic name. -spec(match(emqx_types:topic()) -> list(emqx_types:topic())). match(Topic) when is_binary(Topic) -> + match(Topic, ?TRIE). + +-spec(match_session(emqx_topic:topic()) -> list(emqx_topic:topic())). +match_session(Topic) when is_binary(Topic) -> + match(Topic, ?SESSION_TRIE). + +match(Topic, Trie) when is_binary(Topic) -> Words = emqx_topic:words(Topic), case emqx_topic:wildcard(Words) of true -> @@ -105,17 +146,26 @@ match(Topic) when is_binary(Topic) -> %% Such clients will get disconnected. []; false -> - do_match(Words) + do_match(Words, Trie) end. %% @doc Is the trie empty? -spec(empty() -> boolean()). -empty() -> ets:first(?TRIE) =:= '$end_of_table'. +empty() -> empty(?TRIE). + +empty_session() -> + empty(?SESSION_TRIE). + +empty(Trie) -> ets:first(Trie) =:= '$end_of_table'. -spec lock_tables() -> ok. lock_tables() -> mnesia:write_lock_table(?TRIE). +-spec lock_session_tables() -> ok. +lock_session_tables() -> + mnesia:write_lock_table(?SESSION_TRIE). + %%-------------------------------------------------------------------- %% Internal functions %%-------------------------------------------------------------------- @@ -163,70 +213,70 @@ make_prefixes([H | T], Prefix0, Acc0) -> Acc = [Prefix | Acc0], make_prefixes(T, Prefix, Acc). -insert_key(Key) -> - T = case mnesia:wread({?TRIE, Key}) of +insert_key(Key, Trie) -> + T = case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T1] -> T1#?TRIE{count = C + 1}; [] -> #?TRIE{key = Key, count = 1} end, - ok = mnesia:write(T). + ok = mnesia:write(Trie, T, write). -delete_key(Key) -> - case mnesia:wread({?TRIE, Key}) of +delete_key(Key, Trie) -> + case mnesia:wread({Trie, Key}) of [#?TRIE{count = C} = T] when C > 1 -> - ok = mnesia:write(T#?TRIE{count = C - 1}); + ok = mnesia:write(Trie, T#?TRIE{count = C - 1}, write); [_] -> - ok = mnesia:delete(?TRIE, Key, write); + ok = mnesia:delete(Trie, Key, write); [] -> ok end. %% micro-optimization: no need to lookup when topic is not wildcard %% because we only insert wildcards to emqx_trie -lookup_topic(_Topic, false) -> []; -lookup_topic(Topic, true) -> lookup_topic(Topic). +lookup_topic(_Topic,_Trie, false) -> []; +lookup_topic(Topic, Trie, true) -> lookup_topic(Topic, Trie). -lookup_topic(Topic) when is_binary(Topic) -> - case ets:lookup(?TRIE, ?TOPIC(Topic)) of +lookup_topic(Topic, Trie) when is_binary(Topic) -> + case ets:lookup(Trie, ?TOPIC(Topic)) of [#?TRIE{count = C}] -> [Topic || C > 0]; [] -> [] end. -has_prefix(empty) -> true; %% this is the virtual tree root -has_prefix(Prefix) -> - case ets:lookup(?TRIE, ?PREFIX(Prefix)) of +has_prefix(empty, _Trie) -> true; %% this is the virtual tree root +has_prefix(Prefix, Trie) -> + case ets:lookup(Trie, ?PREFIX(Prefix)) of [#?TRIE{count = C}] -> C > 0; [] -> false end. -do_match([<<"$", _/binary>> = Prefix | Words]) -> +do_match([<<"$", _/binary>> = Prefix | Words], Trie) -> %% For topics having dollar sign prefix, %% we do not match root level + or #, %% fast forward to the next level. case Words =:= [] of - true -> lookup_topic(Prefix); + true -> lookup_topic(Prefix, Trie); false -> [] - end ++ do_match(Words, Prefix); -do_match(Words) -> - do_match(Words, empty). + end ++ do_match(Words, Prefix, Trie); +do_match(Words, Trie) -> + do_match(Words, empty, Trie). -do_match(Words, Prefix) -> +do_match(Words, Prefix, Trie) -> case is_compact() of - true -> match_compact(Words, Prefix, false, []); - false -> match_no_compact(Words, Prefix, false, []) + true -> match_compact(Words, Prefix, Trie, false, []); + false -> match_no_compact(Words, Prefix, Trie, false, []) end. -match_no_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/+/# or foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% e.g. foo/+ +match_no_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/+/# or foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% e.g. foo/+ Acc; -match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - case has_prefix(Prefix) of +match_no_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + case has_prefix(Prefix, Trie) of true -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_no_compact(Words, join(Prefix, '+'), true, Acc1), - match_no_compact(Words, join(Prefix, Word), IsWildcard, Acc); + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_no_compact(Words, join(Prefix, '+'), Trie, true, Acc1), + match_no_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc); false -> %% non-compact paths in database %% if there is no prefix matches the current topic prefix @@ -243,26 +293,26 @@ match_no_compact([Word | Words], Prefix, IsWildcard, Acc0) -> Acc0 end. -match_compact([], Topic, IsWildcard, Acc) -> - 'match_#'(Topic) ++ %% try match foo/bar/# - lookup_topic(Topic, IsWildcard) ++ %% try match foo/bar +match_compact([], Topic, Trie, IsWildcard, Acc) -> + 'match_#'(Topic, Trie) ++ %% try match foo/bar/# + lookup_topic(Topic, Trie, IsWildcard) ++ %% try match foo/bar Acc; -match_compact([Word | Words], Prefix, IsWildcard, Acc0) -> - Acc1 = 'match_#'(Prefix) ++ Acc0, - Acc = match_compact(Words, join(Prefix, Word), IsWildcard, Acc1), +match_compact([Word | Words], Prefix, Trie, IsWildcard, Acc0) -> + Acc1 = 'match_#'(Prefix, Trie) ++ Acc0, + Acc = match_compact(Words, join(Prefix, Word), Trie, IsWildcard, Acc1), WildcardPrefix = join(Prefix, '+'), %% go deeper to match current_prefix/+ only when: %% 1. current word is the last %% OR %% 2. there is a prefix = 'current_prefix/+' - case Words =:= [] orelse has_prefix(WildcardPrefix) of - true -> match_compact(Words, WildcardPrefix, true, Acc); + case Words =:= [] orelse has_prefix(WildcardPrefix, Trie) of + true -> match_compact(Words, WildcardPrefix, Trie, true, Acc); false -> Acc end. -'match_#'(Prefix) -> +'match_#'(Prefix, Trie) -> MlTopic = join(Prefix, '#'), - lookup_topic(MlTopic). + lookup_topic(MlTopic, Trie). is_compact() -> emqx:get_config([broker, perf, trie_compaction], true). diff --git a/apps/emqx/test/emqx_cm_SUITE.erl b/apps/emqx/test/emqx_cm_SUITE.erl index de143ef46..6ca6fef02 100644 --- a/apps/emqx/test/emqx_cm_SUITE.erl +++ b/apps/emqx/test/emqx_cm_SUITE.erl @@ -98,6 +98,7 @@ t_open_session(_) -> sockname => {{127,0,0,1}, 1883}, peercert => nossl, conn_mod => emqx_connection, + expiry_interval => 0, receive_maximum => 100}, {ok, #{session := Session1, present := false}} = emqx_cm:open_session(true, ClientInfo, ConnInfo), @@ -123,6 +124,7 @@ t_open_session_race_condition(_) -> sockname => {{127,0,0,1}, 1883}, peercert => nossl, conn_mod => emqx_connection, + expiry_interval => 0, receive_maximum => 100}, Parent = self(), @@ -219,7 +221,7 @@ t_discard_session_race(_) -> t_takeover_session(_) -> #{conninfo := ConnInfo} = ?ChanInfo, - {error, not_found} = emqx_cm:takeover_session(<<"clientid">>), + none = emqx_cm:takeover_session(<<"clientid">>), erlang:spawn_link(fun() -> ok = emqx_cm:register_channel(<<"clientid">>, self(), ConnInfo), receive @@ -228,7 +230,7 @@ t_takeover_session(_) -> end end), timer:sleep(100), - {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), + {living, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>). t_kick_session(_) -> diff --git a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl index caa1e7216..65d18af50 100644 --- a/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl +++ b/apps/emqx/test/emqx_mqtt_protocol_v5_SUITE.erl @@ -131,7 +131,6 @@ clean_retained(Topic, Config) -> t_basic_test(Config) -> ConnFun = ?config(conn_fun, Config), Topic = nth(1, ?TOPICS), - ct:print("Basic test starting"), {ok, C} = emqtt:start_link([{proto_ver, v5} | Config]), {ok, _} = emqtt:ConnFun(C), {ok, _, [1]} = emqtt:subscribe(C, Topic, qos1), @@ -333,37 +332,6 @@ t_connect_keepalive_timeout(Config) -> error("keepalive timeout") end. -%% [MQTT-3.1.2-23] -t_connect_session_expiry_interval(Config) -> - ConnFun = ?config(conn_fun, Config), - Topic = nth(1, ?TOPICS), - Payload = "test message", - - {ok, Client1} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>}, - {proto_ver, v5}, - {properties, #{'Session-Expiry-Interval' => 7200}} - | Config - ]), - {ok, _} = emqtt:ConnFun(Client1), - {ok, _, [2]} = emqtt:subscribe(Client1, Topic, qos2), - ok = emqtt:disconnect(Client1), - - {ok, Client2} = emqtt:start_link([{proto_ver, v5} | Config]), - {ok, _} = emqtt:ConnFun(Client2), - {ok, 2} = emqtt:publish(Client2, Topic, Payload, 2), - ok = emqtt:disconnect(Client2), - - {ok, Client3} = emqtt:start_link([ {clientid, <<"t_connect_session_expiry_interval">>}, - {proto_ver, v5}, - {clean_start, false} | Config - ]), - {ok, _} = emqtt:ConnFun(Client3), - [Msg | _ ] = receive_messages(1), - ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), - ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), - ?assertEqual({ok, 2}, maps:find(qos, Msg)), - ok = emqtt:disconnect(Client3). - %% [MQTT-3.1.3-9] %% !!!REFACTOR NEED: %t_connect_will_delay_interval(Config) -> diff --git a/apps/emqx/test/emqx_persistent_session_SUITE.erl b/apps/emqx/test/emqx_persistent_session_SUITE.erl new file mode 100644 index 000000000..ac51636f0 --- /dev/null +++ b/apps/emqx/test/emqx_persistent_session_SUITE.erl @@ -0,0 +1,1061 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_persistent_session_SUITE). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("../include/emqx.hrl"). +-include("../src/emqx_persistent_session.hrl"). + +-compile(export_all). +-compile(nowarn_export_all). + +%%-------------------------------------------------------------------- +%% SUITE boilerplate +%%-------------------------------------------------------------------- + +all() -> + [ {group, persistent_store_enabled} + , {group, persistent_store_disabled} + ]. + +%% A persistent session can be resumed in two ways: +%% 1. The old connection process is still alive, and the session is taken +%% over by the new connection. +%% 2. The old session process has died (e.g., because of node down). +%% The new process resumes the session from the stored state, and finds +%% any subscribed messages from the persistent message store. +%% +%% We want to test both ways, both with the db backend enabled and disabled. +%% +%% In addition, we test both tcp and quic connections. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + SnabbkaffeTCs = [TC || TC <- TCs, is_snabbkaffe_tc(TC)], + GCTests = [TC || TC <- TCs, is_gc_tc(TC)], + OtherTCs = (TCs -- SnabbkaffeTCs) -- GCTests, + [ {persistent_store_enabled, [ {group, no_kill_connection_process} + , {group, kill_connection_process} + , {group, snabbkaffe} + , {group, gc_tests} + ]} + , {persistent_store_disabled, [ {group, no_kill_connection_process} + ]} + , {no_kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]} + , { kill_connection_process, [], [{group, tcp}, {group, quic}, {group, ws}]} + , {snabbkaffe, [], [{group, tcp_snabbkaffe}, {group, quic_snabbkaffe}, {group, ws_snabbkaffe}]} + , {tcp, [], OtherTCs} + , {quic, [], OtherTCs} + , {ws, [], OtherTCs} + , {tcp_snabbkaffe, [], SnabbkaffeTCs} + , {quic_snabbkaffe, [], SnabbkaffeTCs} + , {ws_snabbkaffe, [], SnabbkaffeTCs} + , {gc_tests, [], GCTests} + ]. + +is_snabbkaffe_tc(TC) -> + re:run(atom_to_list(TC), "^t_snabbkaffe_") /= nomatch. + +is_gc_tc(TC) -> + re:run(atom_to_list(TC), "^t_gc_") /= nomatch. + +init_per_group(persistent_store_enabled, Config) -> + %% Start Apps + emqx_common_test_helpers:boot_modules(all), + meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_config, get, fun(?is_enabled_key) -> true; + (Other) -> meck:passthrough([Other]) + end), + emqx_common_test_helpers:start_apps([], fun set_special_confs/1), + ?assertEqual(true, emqx_persistent_session:is_store_enabled()), + [{persistent_store_enabled, true}|Config]; +init_per_group(persistent_store_disabled, Config) -> + %% Start Apps + emqx_common_test_helpers:boot_modules(all), + meck:new(emqx_config, [non_strict, passthrough, no_history, no_link]), + meck:expect(emqx_config, get, fun(?is_enabled_key) -> false; + (Other) -> meck:passthrough([Other]) + end), + emqx_common_test_helpers:start_apps([], fun set_special_confs/1), + ?assertEqual(false, emqx_persistent_session:is_store_enabled()), + [{persistent_store_enabled, false}|Config]; +init_per_group(Group, Config) when Group == ws; Group == ws_snabbkaffe -> + [{ssl,false}, + {host,"localhost"}, + {enable_websocket,true}, + {port, 8083}, + {conn_fun, ws_connect}| Config]; +init_per_group(Group, Config) when Group == tcp; Group == tcp_snabbkaffe -> + [ {port, 1883}, {conn_fun, connect}| Config]; +init_per_group(Group, Config) when Group == quic; Group == quic_snabbkaffe -> + [ {port, 14567}, {conn_fun, quic_connect} | Config]; +init_per_group(no_kill_connection_process, Config) -> + [ {kill_connection_process, false} | Config]; +init_per_group(kill_connection_process, Config) -> + [ {kill_connection_process, true} | Config]; +init_per_group(snabbkaffe, Config) -> + [ {kill_connection_process, true} | Config]; +init_per_group(gc_tests, Config) -> + %% We need to make sure the system does not interfere with this test group. + emqx_common_test_helpers:stop_apps([]), + SessionMsgEts = gc_tests_session_store, + MsgEts = gc_tests_msg_store, + Pid = spawn(fun() -> + ets:new(SessionMsgEts, [named_table, public, ordered_set]), + ets:new(MsgEts, [named_table, public, ordered_set, {keypos, 2}]), + receive stop -> ok end + end), + meck:new(mnesia, [non_strict, passthrough, no_history, no_link]), + meck:expect(mnesia, dirty_first, fun(?SESS_MSG_TAB) -> ets:first(SessionMsgEts); + (?MSG_TAB) -> ets:first(MsgEts); + (X) -> meck:passthrough(X) + end), + meck:expect(mnesia, dirty_next, fun(?SESS_MSG_TAB, X) -> ets:next(SessionMsgEts, X); + (?MSG_TAB, X) -> ets:next(MsgEts, X); + (Tab, X) -> meck:passthrough([Tab, X]) + end), + meck:expect(mnesia, dirty_delete, fun(?MSG_TAB, X) -> ets:delete(MsgEts, X); + (Tab, X) -> meck:passthrough([Tab, X]) + end), + [{store_owner, Pid}, {session_msg_store, SessionMsgEts}, {msg_store, MsgEts} | Config]. + +init_per_suite(Config) -> + Config. + +set_special_confs(emqx) -> + Path = emqx_common_test_helpers:deps_path(emqx, "test/emqx_SUITE_data/loaded_plugins"), + application:set_env(emqx, plugins_loaded_file, Path); +set_special_confs(_) -> + ok. + +end_per_suite(_Config) -> + ok. + +end_per_group(gc_tests, Config) -> + meck:unload(mnesia), + ?config(store_owner, Config) ! stop, + ok; +end_per_group(persistent_store_enabled, _Config) -> + meck:unload(emqx_config), + emqx_common_test_helpers:stop_apps([]); +end_per_group(persistent_store_disabled, _Config) -> + meck:unload(emqx_config), + emqx_common_test_helpers:stop_apps([]); +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(TestCase, Config) -> + Config1 = preconfig_per_testcase(TestCase, Config), + case is_gc_tc(TestCase) of + true -> + ets:delete_all_objects(?config(msg_store, Config)), + ets:delete_all_objects(?config(session_msg_store, Config)); + false -> + skip + end, + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase(init, Config1); + _ -> Config1 + end. + +end_per_testcase(TestCase, Config) -> + case is_snabbkaffe_tc(TestCase) of + true -> snabbkaffe:stop(); + false -> skip + end, + case erlang:function_exported(?MODULE, TestCase, 2) of + true -> ?MODULE:TestCase('end', Config); + false -> ok + end, + Config. + +preconfig_per_testcase(TestCase, Config) -> + {BaseName, Config1} = + case ?config(tc_group_properties, Config) of + [] -> + %% We are running a single testcase + {atom_to_binary(TestCase), + init_per_group(tcp, init_per_group(kill_connection_process, Config))}; + [_|_] = Props-> + Path = lists:reverse(?config(tc_group_path, Config) ++ Props), + Pre0 = [atom_to_list(N) || {name, N} <- lists:flatten(Path)], + Pre1 = lists:join("_", Pre0 ++ [atom_to_binary(TestCase)]), + {iolist_to_binary(Pre1), + Config} + end, + [ {topic, iolist_to_binary([BaseName, "/foo"])} + , {stopic, iolist_to_binary([BaseName, "/+"])} + , {stopic_alt, iolist_to_binary([BaseName, "/foo"])} + , {client_id, BaseName} + | Config1]. + +%%-------------------------------------------------------------------- +%% Helpers +%%-------------------------------------------------------------------- + +client_info(Key, Client) -> + maps:get(Key, maps:from_list(emqtt:info(Client)), undefined). + +receive_messages(Count) -> + receive_messages(Count, []). + +receive_messages(0, Msgs) -> + Msgs; +receive_messages(Count, Msgs) -> + receive + {publish, Msg} -> + receive_messages(Count-1, [Msg|Msgs]); + _Other -> + receive_messages(Count, Msgs) + after 1000 -> + Msgs + end. + +maybe_kill_connection_process(ClientId, Config) -> + case ?config(kill_connection_process, Config) of + true -> + [ConnectionPid] = emqx_cm:lookup_channels(ClientId), + ?assert(is_pid(ConnectionPid)), + Ref = monitor(process, ConnectionPid), + ConnectionPid ! die_if_test, + receive {'DOWN', Ref, process, ConnectionPid, normal} -> ok + after 3000 -> error(process_did_not_die) + end; + false -> + ok + end. + +snabbkaffe_sync_publish(Topic, Payloads, Config) -> + Fun = fun(Client, Payload) -> + ?wait_async_action( {ok, _} = emqtt:publish(Client, Topic, Payload, 2) + , #{?snk_kind := ps_persist_msg, payload := Payload} + ) + end, + do_publish(Payloads, Fun, Config). + +publish(Topic, Payloads, Config) -> + Fun = fun(Client, Payload) -> + {ok, _} = emqtt:publish(Client, Topic, Payload, 2) + end, + do_publish(Payloads, Fun, Config). + +do_publish(Payloads = [_|_], PublishFun, Config) -> + %% Publish from another process to avoid connection confusion. + {Pid, Ref} = + spawn_monitor( + fun() -> + %% For convenience, always publish using tcp. + %% The publish path is not what we are testing. + {ok, Client} = emqtt:start_link([ {proto_ver, v5} + , {port, 1883} ]), + {ok, _} = emqtt:connect(Client), + lists:foreach(fun(Payload) -> PublishFun(Client, Payload) end, Payloads), + ok = emqtt:disconnect(Client) + end), + receive + {'DOWN', Ref, process, Pid, normal} -> ok; + {'DOWN', Ref, process, Pid, What} -> error({failed_publish, What}) + end; +do_publish(Payload, PublishFun, Config) -> + do_publish([Payload], PublishFun, Config). + +%%-------------------------------------------------------------------- +%% Test Cases +%%-------------------------------------------------------------------- + +%% [MQTT-3.1.2-23] +t_connect_session_expiry_interval(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload = <<"test message">>, + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(ClientId, Config), + + publish(Topic, Payload, Config), + + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + [Msg | _ ] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg)), + ?assertEqual({ok, 2}, maps:find(qos, Msg)), + ok = emqtt:disconnect(Client2). + +t_without_client_id(Config) -> + process_flag(trap_exit, true), %% Emqtt client dies + ConnFun = ?config(conn_fun, Config), + {ok, Client0} = emqtt:start_link([ {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {error, {client_identifier_not_valid, _}} = emqtt:ConnFun(Client0), + ok. + +t_assigned_clientid_persistent_session(Config) -> + ConnFun = ?config(conn_fun, Config), + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + + AssignedClientId = client_info(clientid, Client1), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(AssignedClientId, Config), + + {ok, Client2} = emqtt:start_link([ {clientid, AssignedClientId}, + {proto_ver, v5}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertEqual(1, client_info(session_present, Client2)), + ok = emqtt:disconnect(Client2). + +t_cancel_on_disconnect(Config) -> + %% Open a persistent session, but cancel the persistence when + %% shutting down the connection. + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 0}), + + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertEqual(0, client_info(session_present, Client2)), + ok = emqtt:disconnect(Client2). + +t_persist_on_disconnect(Config) -> + %% Open a non-persistent session, but add the persistence when + %% shutting down the connection. This is a protocol error, and + %% should not convert the session into a persistent session. + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 0}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + + %% Strangely enough, the disconnect is reported as successful by emqtt. + ok = emqtt:disconnect(Client1, 0, #{'Session-Expiry-Interval' => 30}), + + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {clean_start, false}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + %% The session should not be known, since it wasn't persisted because of the + %% changed expiry interval in the disconnect call. + ?assertEqual(0, client_info(session_present, Client2)), + ok = emqtt:disconnect(Client2). + +wait_for_pending(SId) -> + wait_for_pending(SId, 100). + +wait_for_pending(_SId, 0) -> + error(exhausted_wait_for_pending); +wait_for_pending(SId, N) -> + case emqx_persistent_session:pending(SId) of + [] -> timer:sleep(1), wait_for_pending(SId, N - 1); + [_|_] = Pending -> Pending + end. + +t_process_dies_session_expires(Config) -> + %% Emulate an error in the connect process, + %% or that the node of the process goes down. + %% A persistent session should eventually expire. + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload = <<"test">>, + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 1}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(ClientId, Config), + + ok = publish(Topic, [Payload], Config), + + SessionId = + case ?config(persistent_store_enabled, Config) of + false -> undefined; + true -> + %% The session should not be marked as expired. + {Tag, Session} = emqx_persistent_session:lookup(ClientId), + ?assertEqual(persistent, Tag), + SId = emqx_session:info(id, Session), + case ?config(kill_connection_process, Config) of + true -> + %% The session should have a pending message + ?assertMatch([_], wait_for_pending(SId)); + false -> + skip + end, + SId + end, + + timer:sleep(1100), + + %% The session should now be marked as expired. + case (?config(kill_connection_process, Config) andalso + ?config(persistent_store_enabled, Config)) of + true -> ?assertMatch({expired, _}, emqx_persistent_session:lookup(ClientId)); + false -> skip + end, + + {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertEqual(0, client_info(session_present, Client2)), + + case (?config(kill_connection_process, Config) andalso + ?config(persistent_store_enabled, Config)) of + true -> + %% The session should be a fresh one + {persistent, NewSession} = emqx_persistent_session:lookup(ClientId), + ?assertNotEqual(SessionId, emqx_session:info(id, NewSession)), + %% The old session should now either be marked as abandoned or already be garbage collected. + ?assertMatch([], emqx_persistent_session:pending(SessionId)); + false -> + skip + end, + + %% We should not receive the pending message + ?assertEqual([], receive_messages(1)), + + emqtt:disconnect(Client2). + +t_publish_while_client_is_gone(Config) -> + %% A persistent session should receive messages in its + %% subscription even if the process owning the session dies. + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload1 = <<"hello1">>, + Payload2 = <<"hello2">>, + ClientId = ?config(client_id, Config), + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + ok = publish(Topic, [Payload1, Payload2], Config), + + {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + [Msg1] = receive_messages(1), + [Msg2] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Payload1)}, maps:find(payload, Msg1)), + ?assertEqual({ok, 2}, maps:find(qos, Msg1)), + ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg2)), + ?assertEqual({ok, 2}, maps:find(qos, Msg2)), + + ok = emqtt:disconnect(Client2). + +t_clean_start_drops_subscriptions(Config) -> + %% 1. A persistent session is started and disconnected. + %% 2. While disconnected, a message is published and persisted. + %% 3. When connecting again, the clean start flag is set, the subscription is renewed, + %% then we disconnect again. + %% 4. Finally, a new connection is made with clean start set to false. + %% The original message should not be delivered. + + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payload1 = <<"hello1">>, + Payload2 = <<"hello2">>, + Payload3 = <<"hello3">>, + ClientId = ?config(client_id, Config), + + %% 1. + {ok, Client1} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + %% 2. + ok = publish(Topic, Payload1, Config), + + %% 3. + {ok, Client2} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, true} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + ?assertEqual(0, client_info(session_present, Client2)), + {ok, _, [2]} = emqtt:subscribe(Client2, STopic, qos2), + + ok = publish(Topic, Payload2, Config), + [Msg1] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, Msg1)), + + ok = emqtt:disconnect(Client2), + maybe_kill_connection_process(ClientId, Config), + + %% 4. + {ok, Client3} = emqtt:start_link([ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client3), + + ok = publish(Topic, Payload3, Config), + [Msg2] = receive_messages(1), + ?assertEqual({ok, iolist_to_binary(Payload3)}, maps:find(payload, Msg2)), + + ok = emqtt:disconnect(Client3). + +t_unsubscribe(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + ClientId = ?config(client_id, Config), + {ok, Client} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client), + {ok, _, [2]} = emqtt:subscribe(Client, STopic, qos2), + case emqx_persistent_session:is_store_enabled() of + true -> + {persistent, Session} = emqx_persistent_session:lookup(ClientId), + SessionID = emqx_session:info(id, Session), + SessionIDs = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)], + ?assert(lists:member(SessionID, SessionIDs)), + ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), + {ok, _, _} = emqtt:unsubscribe(Client, STopic), + ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), + SessionIDs2 = [SId || #route{dest = SId} <- emqx_session_router:match_routes(Topic)], + ?assert(not lists:member(SessionID, SessionIDs2)); + false -> + ?assertMatch([_], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]), + {ok, _, _} = emqtt:unsubscribe(Client, STopic), + ?assertMatch([], [Sub || {ST, _} = Sub <- emqtt:subscriptions(Client), ST =:= STopic]) + end, + ok = emqtt:disconnect(Client). + +t_multiple_subscription_matches(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic1 = ?config(stopic, Config), + STopic2 = ?config(stopic_alt, Config), + Payload = <<"test message">>, + ClientId = ?config(client_id, Config), + + {ok, Client1} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic1, qos2), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic2, qos2), + ok = emqtt:disconnect(Client1), + + maybe_kill_connection_process(ClientId, Config), + + publish(Topic, Payload, Config), + + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}}, + {clean_start, false} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + + %% We will receive the same message twice because it matches two subscriptions. + [Msg1, Msg2] = receive_messages(2), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg1)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg1)), + ?assertEqual({ok, iolist_to_binary(Topic)}, maps:find(topic, Msg2)), + ?assertEqual({ok, iolist_to_binary(Payload)}, maps:find(payload, Msg2)), + ?assertEqual({ok, 2}, maps:find(qos, Msg1)), + ?assertEqual({ok, 2}, maps:find(qos, Msg2)), + ok = emqtt:disconnect(Client2). + +t_lost_messages_because_of_gc(init, Config) -> + case (emqx_persistent_session:is_store_enabled() + andalso ?config(kill_connection_process, Config)) of + true -> + Retain = 1000, + OldRetain = emqx_config:get(?msg_retain, Retain), + emqx_config:put(?msg_retain, Retain), + [{retain, Retain}, {old_retain, OldRetain}|Config]; + false -> {skip, only_relevant_with_store_and_kill_process} + end; +t_lost_messages_because_of_gc('end', Config) -> + OldRetain = ?config(old_retain, Config), + emqx_config:put(?msg_retain, OldRetain), + ok. + +t_lost_messages_because_of_gc(Config) -> + ConnFun = ?config(conn_fun, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + ClientId = ?config(client_id, Config), + Retain = ?config(retain, Config), + Payload1 = <<"hello1">>, + Payload2 = <<"hello2">>, + {ok, Client1} = emqtt:start_link([ {clientid, ClientId}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + publish(Topic, Payload1, Config), + timer:sleep(2 * Retain), + publish(Topic, Payload2, Config), + emqx_persistent_session_gc:message_gc_worker(), + {ok, Client2} = emqtt:start_link([ {clientid, ClientId}, + {clean_start, false}, + {proto_ver, v5}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config]), + {ok, _} = emqtt:ConnFun(Client2), + Msgs = receive_messages(2), + ?assertMatch([_], Msgs), + ?assertEqual({ok, iolist_to_binary(Payload2)}, maps:find(payload, hd(Msgs))), + emqtt:disconnect(Client2), + ok. + + +%%-------------------------------------------------------------------- +%% Snabbkaffe helpers +%%-------------------------------------------------------------------- + +check_snabbkaffe_vanilla(Trace) -> + ResumeTrace = [T || #{?snk_kind := K} = T <- Trace, + re:run(atom_to_list(K), "^ps_") /= nomatch], + ?assertMatch([_|_], ResumeTrace), + [_Sid] = lists:usort(?projection(sid, ResumeTrace)), + %% Check internal flow of the emqx_cm resuming + ?assert(?strict_causality(#{ ?snk_kind := ps_resuming }, + #{ ?snk_kind := ps_initial_pendings }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_initial_pendings }, + #{ ?snk_kind := ps_persist_pendings }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_persist_pendings }, + #{ ?snk_kind := ps_notify_writers }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers }, + #{ ?snk_kind := ps_node_markers }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_node_markers }, + #{ ?snk_kind := ps_resume_session }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_resume_session }, + #{ ?snk_kind := ps_marker_pendings }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings }, + #{ ?snk_kind := ps_marker_pendings_msgs }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings_msgs }, + #{ ?snk_kind := ps_resume_end }, + ResumeTrace)), + + %% Check flow between worker and emqx_cm + ?assert(?strict_causality(#{ ?snk_kind := ps_notify_writers }, + #{ ?snk_kind := ps_worker_started }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_marker_pendings }, + #{ ?snk_kind := ps_worker_resume_end }, + ResumeTrace)), + ?assert(?strict_causality(#{ ?snk_kind := ps_worker_resume_end }, + #{ ?snk_kind := ps_worker_shutdown }, + ResumeTrace)), + + [Markers] = ?projection(markers, ?of_kind(ps_node_markers, Trace)), + ?assertMatch([_], Markers). + +%%-------------------------------------------------------------------- +%% Snabbkaffe tests +%%-------------------------------------------------------------------- + +t_snabbkaffe_vanilla_stages(Config) -> + %% Test that all stages of session resume works ok in the simplest case + process_flag(trap_exit, true), + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + EmqttOpts = [ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config], + {ok, Client1} = emqtt:start_link([{clean_start, true}|EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client1), + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + ?check_trace( + begin + {ok, Client2} = emqtt:start_link([{clean_start, false}|EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client2), + ok = emqtt:disconnect(Client2) + end, + fun(ok, Trace) -> + check_snabbkaffe_vanilla(Trace) + end), + ok. + +t_snabbkaffe_pending_messages(Config) -> + %% Make sure there are pending messages are fetched during the init stage. + process_flag(trap_exit, true), + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payloads = [<<"test", (integer_to_binary(X))/binary>> || X <- [1,2,3,4,5]], + EmqttOpts = [ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config], + {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + ?check_trace( + begin + snabbkaffe_sync_publish(Topic, Payloads, Config), + {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client2), + Msgs = receive_messages(length(Payloads)), + ReceivedPayloads = [P || #{ payload := P } <- Msgs], + ?assertEqual(lists:sort(ReceivedPayloads), lists:sort(Payloads)), + ok = emqtt:disconnect(Client2) + end, + fun(ok, Trace) -> + check_snabbkaffe_vanilla(Trace), + %% Check that all messages was delivered from the DB + [Delivers1] = ?projection(msgs, ?of_kind(ps_persist_pendings_msgs, Trace)), + [Delivers2] = ?projection(msgs, ?of_kind(ps_marker_pendings_msgs, Trace)), + Delivers = Delivers1 ++ Delivers2, + ?assertEqual(length(Payloads), length(Delivers)), + %% Check for no duplicates + ?assertEqual(lists:usort(Delivers), lists:sort(Delivers)) + end), + ok. + +t_snabbkaffe_buffered_messages(Config) -> + %% Make sure to buffer messages during startup. + process_flag(trap_exit, true), + ConnFun = ?config(conn_fun, Config), + ClientId = ?config(client_id, Config), + Topic = ?config(topic, Config), + STopic = ?config(stopic, Config), + Payloads1 = [<<"test", (integer_to_binary(X))/binary>> || X <- [1, 2, 3]], + Payloads2 = [<<"test", (integer_to_binary(X))/binary>> || X <- [4, 5, 6]], + EmqttOpts = [ {proto_ver, v5}, + {clientid, ClientId}, + {properties, #{'Session-Expiry-Interval' => 30}} + | Config], + {ok, Client1} = emqtt:start_link([{clean_start, true} | EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client1), + {ok, _, [2]} = emqtt:subscribe(Client1, STopic, qos2), + ok = emqtt:disconnect(Client1), + maybe_kill_connection_process(ClientId, Config), + + publish(Topic, Payloads1, Config), + + ?check_trace( + begin + %% Make the resume init phase wait until the first message is delivered. + ?force_ordering( #{ ?snk_kind := ps_worker_deliver }, + #{ ?snk_kind := ps_resume_end }), + spawn_link(fun() -> + ?block_until(#{ ?snk_kind := ps_marker_pendings_msgs }, infinity, 5000), + publish(Topic, Payloads2, Config) + end), + {ok, Client2} = emqtt:start_link([{clean_start, false} | EmqttOpts]), + {ok, _} = emqtt:ConnFun(Client2), + Msgs = receive_messages(length(Payloads1) + length(Payloads2) + 1), + ReceivedPayloads = [P || #{ payload := P } <- Msgs], + ?assertEqual(lists:sort(Payloads1 ++ Payloads2), + lists:sort(ReceivedPayloads)), + ok = emqtt:disconnect(Client2) + end, + fun(ok, Trace) -> + check_snabbkaffe_vanilla(Trace), + %% Check that some messages was buffered in the writer process + [Msgs] = ?projection(msgs, ?of_kind(ps_writer_pendings, Trace)), + ?assertMatch(X when 0 < X andalso X =< length(Payloads2), + length(Msgs)) + end), + ok. + +%%-------------------------------------------------------------------- +%% GC tests +%%-------------------------------------------------------------------- + +-define(MARKER, 3). +-define(DELIVERED, 2). +-define(UNDELIVERED, 1). +-define(ABANDONED, 0). + +msg_id() -> + emqx_guid:gen(). + +delivered_msg(MsgId, SessionID, STopic) -> + {SessionID, MsgId, STopic, ?DELIVERED}. + +undelivered_msg(MsgId, SessionID, STopic) -> + {SessionID, MsgId, STopic, ?UNDELIVERED}. + +marker_msg(MarkerID, SessionID) -> + {SessionID, MarkerID, <<>>, ?MARKER}. + +guid(MicrosecondsAgo) -> + %% Make a fake GUID and set a timestamp. + << TS:64, Tail:64 >> = emqx_guid:gen(), + << (TS - MicrosecondsAgo) : 64, Tail:64 >>. + +abandoned_session_msg(SessionID) -> + abandoned_session_msg(SessionID, 0). + +abandoned_session_msg(SessionID, MicrosecondsAgo) -> + TS = erlang:system_time(microsecond), + {SessionID, <<>>, <<(TS - MicrosecondsAgo) : 64>>, ?ABANDONED}. + +fresh_gc_delete_fun() -> + Ets = ets:new(gc_collect, [ordered_set]), + fun(delete, Key) -> ets:insert(Ets, {Key}), ok; + (collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List); + (_, _Key) -> ok + end. + +fresh_gc_callbacks_fun() -> + Ets = ets:new(gc_collect, [ordered_set]), + fun(collect, <<>>) -> List = ets:match(Ets, {'$1'}), ets:delete(Ets), lists:append(List); + (Tag, Key) -> ets:insert(Ets, {{Key, Tag}}), ok + end. + +get_gc_delete_messages() -> + Fun = fresh_gc_delete_fun(), + emqx_persistent_session:gc_session_messages(Fun), + Fun(collect, <<>>). + +get_gc_callbacks() -> + Fun = fresh_gc_callbacks_fun(), + emqx_persistent_session:gc_session_messages(Fun), + Fun(collect, <<>>). + + +t_gc_all_delivered(Config) -> + Store = ?config(session_msg_store, Config), + STopic = ?config(stopic, Config), + SessionId = emqx_guid:gen(), + MsgIds = [msg_id() || _ <- lists:seq(1, 5)], + Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], + Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], + SortedContent = lists:usort(Delivered ++ Undelivered), + ets:insert(Store, [{X, <<>>} || X <- SortedContent]), + GCMessages = get_gc_delete_messages(), + ?assertEqual(SortedContent, GCMessages), + ok. + +t_gc_some_undelivered(Config) -> + Store = ?config(session_msg_store, Config), + STopic = ?config(stopic, Config), + SessionId = emqx_guid:gen(), + MsgIds = [msg_id() || _ <- lists:seq(1, 10)], + Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Delivered1,_Delivered2} = split(Delivered), + Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Undelivered1, Undelivered2} = split(Undelivered), + Content = Delivered1 ++ Undelivered1 ++ Undelivered2, + ets:insert(Store, [{X, <<>>} || X <- Content]), + Expected = lists:usort(Delivered1 ++ Undelivered1), + GCMessages = get_gc_delete_messages(), + ?assertEqual(Expected, GCMessages), + ok. + +t_gc_with_markers(Config) -> + Store = ?config(session_msg_store, Config), + STopic = ?config(stopic, Config), + SessionId = emqx_guid:gen(), + MsgIds1 = [msg_id() || _ <- lists:seq(1, 10)], + MarkerId = msg_id(), + MsgIds = [msg_id() || _ <- lists:seq(1, 4)] ++ MsgIds1, + Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Delivered1,_Delivered2} = split(Delivered), + Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Undelivered1, Undelivered2} = split(Undelivered), + Markers = [marker_msg(MarkerId, SessionId)], + Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ Markers, + ets:insert(Store, [{X, <<>>} || X <- Content]), + Expected = lists:usort(Delivered1 ++ Undelivered1), + GCMessages = get_gc_delete_messages(), + ?assertEqual(Expected, GCMessages), + ok. + +t_gc_abandoned_some_undelivered(Config) -> + Store = ?config(session_msg_store, Config), + STopic = ?config(stopic, Config), + SessionId = emqx_guid:gen(), + MsgIds = [msg_id() || _ <- lists:seq(1, 10)], + Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Delivered1,_Delivered2} = split(Delivered), + Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], + {Undelivered1, Undelivered2} = split(Undelivered), + Abandoned = abandoned_session_msg(SessionId), + Content = Delivered1 ++ Undelivered1 ++ Undelivered2 ++ [Abandoned], + ets:insert(Store, [{X, <<>>} || X <- Content]), + Expected = lists:usort(Delivered1 ++ Undelivered1 ++ Undelivered2), + GCMessages = get_gc_delete_messages(), + ?assertEqual(Expected, GCMessages), + ok. + +t_gc_abandoned_only_called_on_empty_session(Config) -> + Store = ?config(session_msg_store, Config), + STopic = ?config(stopic, Config), + SessionId = emqx_guid:gen(), + MsgIds = [msg_id() || _ <- lists:seq(1, 10)], + Delivered = [delivered_msg(X, SessionId, STopic) || X <- MsgIds], + Undelivered = [undelivered_msg(X, SessionId, STopic) || X <- MsgIds], + Abandoned = abandoned_session_msg(SessionId), + Content = Delivered ++ Undelivered ++ [Abandoned], + ets:insert(Store, [{X, <<>>} || X <- Content]), + GCMessages = get_gc_callbacks(), + + %% Since we had messages to delete, we don't expect to get the + %% callback on the abandoned session + ?assertEqual([], [ X || {X, abandoned} <- GCMessages]), + + %% But if we have only the abandoned session marker for this + %% session, it should be called. + ets:delete_all_objects(Store), + UndeliveredOtherSession = undelivered_msg(msg_id(), emqx_guid:gen(), <<"topic">>), + ets:insert(Store, [{X, <<>>} || X <- [Abandoned, UndeliveredOtherSession]]), + GCMessages2 = get_gc_callbacks(), + ?assertEqual([Abandoned], [ X || {X, abandoned} <- GCMessages2]), + ok. + +t_gc_session_gc_worker(init, Config) -> + meck:new(emqx_persistent_session, [passthrough, no_link]), + Config; +t_gc_session_gc_worker('end',_Config) -> + meck:unload(emqx_persistent_session), + ok. + +t_gc_session_gc_worker(Config) -> + STopic = ?config(stopic, Config), + SessionID = emqx_guid:gen(), + MsgDeleted = delivered_msg(msg_id(), SessionID, STopic), + MarkerNotDeleted = marker_msg(msg_id(), SessionID), + MarkerDeleted = marker_msg(guid(120 * 1000 * 1000), SessionID), + AbandonedNotDeleted = abandoned_session_msg(SessionID), + AbandonedDeleted = abandoned_session_msg(SessionID, 500 * 1000 * 1000), + meck:expect(emqx_persistent_session, delete_session_message, fun(_Key) -> ok end), + emqx_persistent_session_gc:session_gc_worker(delete, MsgDeleted), + emqx_persistent_session_gc:session_gc_worker(marker, MarkerNotDeleted), + emqx_persistent_session_gc:session_gc_worker(marker, MarkerDeleted), + emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedDeleted), + emqx_persistent_session_gc:session_gc_worker(abandoned, AbandonedNotDeleted), + History = meck:history(emqx_persistent_session, self()), + DeleteCalls = [ Key || {_Pid, {_, delete_session_message, [Key]}, _Result} + <- History], + ?assertEqual(lists:sort([MsgDeleted, AbandonedDeleted, MarkerDeleted]), + lists:sort(DeleteCalls)), + ok. + +t_gc_message_gc(Config) -> + Topic = ?config(topic, Config), + ClientID = ?config(client_id, Config), + Store = ?config(msg_store, Config), + NewMsgs = [emqx_message:make(ClientID, Topic, integer_to_binary(P)) + || P <- lists:seq(6, 10)], + Retain = 60 * 1000, + emqx_config:put(?msg_retain, Retain), + Msgs1 = [emqx_message:make(ClientID, Topic, integer_to_binary(P)) + || P <- lists:seq(1, 5)], + OldMsgs = [M#message{id = guid(Retain*1000)} || M <- Msgs1], + ets:insert(Store, NewMsgs ++ OldMsgs), + ?assertEqual(lists:sort(OldMsgs ++ NewMsgs), ets:tab2list(Store)), + ok = emqx_persistent_session_gc:message_gc_worker(), + ?assertEqual(lists:sort(NewMsgs), ets:tab2list(Store)), + ok. + +split(List) -> + split(List, [], []). + +split([], L1, L2) -> + {L1, L2}; +split([H], L1, L2) -> + {[H|L1], L2}; +split([H1, H2|Left], L1, L2) -> + split(Left, [H1|L1], [H2|L2]). + diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 4b072e2d7..6462fffed 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -100,6 +100,8 @@ clientinfo() -> %% See emqx_session:session() type define sessioninfo() -> ?LET(Session, {session, + sessionid(), % id + boolean(), % is_persistent subscriptions(), % subscriptions non_neg_integer(), % max_subscriptions boolean(), % upgrade_qos @@ -114,6 +116,9 @@ sessioninfo() -> }, emqx_session:info(Session)). +sessionid() -> + emqx_guid:gen(). + subscriptions() -> ?LET(L, list({topic(), subopts()}), maps:from_list(L)). diff --git a/apps/emqx/test/emqx_trie_SUITE.erl b/apps/emqx/test/emqx_trie_SUITE.erl index 00d64877c..769674abc 100644 --- a/apps/emqx/test/emqx_trie_SUITE.erl +++ b/apps/emqx/test/emqx_trie_SUITE.erl @@ -183,7 +183,7 @@ t_delete3(_) -> ?TRIE:delete(<<"sensor/+/unknown">>) end), ?assertEqual([], ?TRIE:match(<<"sensor">>)), - ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>)). + ?assertEqual([], ?TRIE:lookup_topic(<<"sensor/+">>, ?TRIE)). clear_tables() -> emqx_trie:clear_tables().