diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 38e3513cb..5154d8462 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -73,10 +73,14 @@ oom_policy :: emqx_oom:oom_policy(), %% Connected connected :: boolean(), + %% Connected at connected_at :: erlang:timestamp(), disconnected_at :: erlang:timestamp(), - %% Takeover/Resume + %% Takeover + takeover :: boolean(), + %% Resume resuming :: boolean(), + %% Pending delivers when takeovering pendings :: list() }). @@ -125,7 +129,10 @@ init(ConnInfo, Options) -> gc_state = GcState, oom_policy = OomPolicy, timers = #{stats_timer => StatsTimer}, - connected = false + connected = false, + takeover = false, + resuming = false, + pendings = [] }. peer_cert_as_username(Options) -> @@ -234,7 +241,7 @@ handle_in(?CONNECT_PACKET(ConnPkt), Channel) -> handle_out({connack, ReasonCode}, NChannel) end; -handle_in(Packet = ?PUBLISH_PACKET(QoS, Topic, PacketId), Channel = #channel{protocol = Protocol}) -> +handle_in(Packet = ?PUBLISH_PACKET(_QoS, Topic, _PacketId), Channel = #channel{protocol = Protocol}) -> case pipeline([fun validate_packet/2, fun process_alias/2, fun check_publish/2], Packet, Channel) of @@ -372,9 +379,15 @@ handle_in(Packet, Channel) -> process_connect(ConnPkt, Channel) -> case open_session(ConnPkt, Channel) of - {ok, Session, SP} -> + {ok, #{session := Session, present := false}} -> NChannel = Channel#channel{session = Session}, - handle_out({connack, ?RC_SUCCESS, sp(SP)}, NChannel); + handle_out({connack, ?RC_SUCCESS, sp(false)}, NChannel); + {ok, #{session := Session, present := true, pendings := Pendings}} -> + NPendings = lists:usort(lists:append(Pendings, emqx_misc:drain_deliver())), + NChannel = Channel#channel{session = Session, + resuming = true, + pendings = NPendings}, + handle_out({connack, ?RC_SUCCESS, sp(true)}, NChannel); {error, Reason} -> %% TODO: Unknown error? ?LOG(error, "Failed to open session: ~p", [Reason]), @@ -474,8 +487,17 @@ handle_out({connack, ?RC_SUCCESS, SP}, Channel = #channel{client = Client}) -> fun enrich_server_keepalive/2, fun enrich_assigned_clientid/2 ], #{}, Channel), - NChannel = ensure_keepalive(AckProps, ensure_connected(Channel)), - {ok, ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), NChannel}; + AckPacket = ?CONNACK_PACKET(?RC_SUCCESS, SP, AckProps), + Channel1 = ensure_keepalive(AckProps, ensure_connected(Channel)), + case maybe_resume_session(Channel1) of + ignore -> {ok, AckPacket, Channel1}; + {ok, Publishes, NSession} -> + Channel2 = Channel1#channel{session = NSession, + resuming = false, + pendings = []}, + {ok, Packets, _} = handle_out({publish, Publishes}, Channel2), + {ok, [AckPacket|Packets], Channel2} + end; handle_out({connack, ReasonCode}, Channel = #channel{client = Client, protocol = Protocol @@ -489,9 +511,12 @@ handle_out({connack, ReasonCode}, Channel = #channel{client = Client, Reason = emqx_reason_codes:name(ReasonCode1, ProtoVer), {stop, {shutdown, Reason}, ?CONNACK_PACKET(ReasonCode1), Channel}; -handle_out({deliver, Delivers}, Channel = #channel{resuming = true, - pendings = Pendings - }) -> +handle_out({deliver, Delivers}, Channel = #channel{session = Session, + connected = false}) -> + {ok, Channel#channel{session = emqx_session:enqueue(Delivers, Session)}}; + +handle_out({deliver, Delivers}, Channel = #channel{takeover = true, + pendings = Pendings}) -> {ok, Channel#channel{pendings = lists:append(Pendings, Delivers)}}; handle_out({deliver, Delivers}, Channel = #channel{session = Session}) -> @@ -571,20 +596,18 @@ handle_out({Type, Data}, Channel) -> %% Handle call %%-------------------------------------------------------------------- -%%-------------------------------------------------------------------- -%% Takeover session -%%-------------------------------------------------------------------- - +%% Session Takeover handle_call({takeover, 'begin'}, Channel = #channel{session = Session}) -> - {ok, Session, Channel#channel{resuming = true}}; + {ok, Session, Channel#channel{takeover = true}}; handle_call({takeover, 'end'}, Channel = #channel{session = Session, pendings = Pendings}) -> ok = emqx_session:takeover(Session), - {stop, {shutdown, takeovered}, Pendings, Channel}; + AllPendings = lists:append(emqx_misc:drain_deliver(), Pendings), + {stop, {shutdown, takeovered}, AllPendings, Channel}; handle_call(Req, Channel) -> - ?LOG(error, "Unexpected call: Req", [Req]), + ?LOG(error, "Unexpected call: ~p", [Req]), {ok, ignored, Channel}. %%-------------------------------------------------------------------- @@ -1110,6 +1133,19 @@ ensure_keepalive_timer(Interval, Channel = #channel{client = #{zone := Zone}}) - Keepalive = emqx_keepalive:init(round(timer:seconds(Interval) * Backoff)), ensure_timer(alive_timer, Channel#channel{keepalive = Keepalive}). +maybe_resume_session(#channel{resuming = false}) -> + ignore; +maybe_resume_session(#channel{session = Session, + resuming = true, + pendings = Pendings}) -> + {ok, Publishes, Session1} = emqx_session:redeliver(Session), + case emqx_session:deliver(Pendings, Session1) of + {ok, Session2} -> + {ok, Publishes, Session2}; + {ok, More, Session2} -> + {ok, lists:append(Publishes, More), Session2} + end. + %%-------------------------------------------------------------------- %% Is ACL enabled? %%-------------------------------------------------------------------- diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 1e5842ec4..9c3b58068 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -157,11 +157,15 @@ set_chan_stats(ClientId, ChanPid, Stats) -> %% @doc Open a session. -spec(open_session(boolean(), emqx_types:client(), map()) - -> {ok, emqx_session:session()} | {error, Reason :: term()}). + -> {ok, #{session := emqx_session:session(), + present := boolean(), + pendings => list()}} + | {error, Reason :: term()}). open_session(true, Client = #{client_id := ClientId}, Options) -> CleanStart = fun(_) -> ok = discard_session(ClientId), - {ok, emqx_session:init(Client, Options), false} + Session = emqx_session:init(Client, Options), + {ok, #{session => Session, present => false}} end, emqx_cm_locker:trans(ClientId, CleanStart); @@ -169,12 +173,14 @@ open_session(false, Client = #{client_id := ClientId}, Options) -> ResumeStart = fun(_) -> case takeover_session(ClientId) of {ok, ConnMod, ChanPid, Session} -> - NSession = emqx_session:resume(ClientId, Session), + ok = emqx_session:resume(ClientId, Session), Pendings = ConnMod:takeover(ChanPid, 'end'), - io:format("Pending Delivers: ~p~n", [Pendings]), - {ok, NSession, true}; + {ok, #{session => Session, + present => true, + pendings => Pendings}}; {error, not_found} -> - {ok, emqx_session:init(Client, Options), false} + Session = emqx_session:init(Client, Options), + {ok, #{session => Session, present => false}} end end, emqx_cm_locker:trans(ClientId, ResumeStart). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index b6cdf5f8b..a3cbc1157 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -255,18 +255,8 @@ connected(cast, {incoming, Packet = ?PACKET(?CONNECT)}, State) -> connected(cast, {incoming, Packet}, State) when is_record(Packet, mqtt_packet) -> handle_incoming(Packet, fun keep_state/1, State); -connected(info, Deliver = {deliver, _Topic, _Msg}, - State = #state{chan_state = ChanState}) -> - Delivers = emqx_misc:drain_deliver([Deliver]), - case emqx_channel:handle_out({deliver, Delivers}, ChanState) of - {ok, NChanState} -> - keep_state(State#state{chan_state = NChanState}); - {ok, Packets, NChanState} -> - NState = State#state{chan_state = NChanState}, - handle_outgoing(Packets, fun keep_state/1, NState); - {stop, Reason, NChanState} -> - stop(Reason, State#state{chan_state = NChanState}) - end; +connected(info, Deliver = {deliver, _Topic, _Msg}, State) -> + handle_deliver(emqx_misc:drain_deliver([Deliver]), State); connected(EventType, Content, State) -> ?HANDLE(EventType, Content, State). @@ -279,6 +269,9 @@ disconnected(enter, _, _State) -> %% CleanStart is true keep_state_and_data; +disconnected(info, Deliver = {deliver, _Topic, _Msg}, State) -> + handle_deliver([Deliver], State); + disconnected(EventType, Content, State) -> ?HANDLE(EventType, Content, State). @@ -469,6 +462,20 @@ handle_incoming(Packet = ?PACKET(Type), SuccFun, handle_outgoing(OutPacket, Shutdown, State#state{chan_state = NChanState}) end. +%%------------------------------------------------------------------- +%% Handle deliver + +handle_deliver(Delivers, State = #state{chan_state = ChanState}) -> + case emqx_channel:handle_out({deliver, Delivers}, ChanState) of + {ok, NChanState} -> + keep_state(State#state{chan_state = NChanState}); + {ok, Packets, NChanState} -> + NState = State#state{chan_state = NChanState}, + handle_outgoing(Packets, fun keep_state/1, NState); + {stop, Reason, NChanState} -> + stop(Reason, State#state{chan_state = NChanState}) + end. + %%-------------------------------------------------------------------- %% Handle outgoing packets diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 94f859a82..a325dc94b 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -29,7 +29,8 @@ , proc_stats/1 ]). --export([ drain_deliver/1 +-export([ drain_deliver/0 + , drain_deliver/1 , drain_down/1 ]). @@ -96,6 +97,9 @@ proc_stats(Pid) -> end. %% @doc Drain delivers from the channel's mailbox. +drain_deliver() -> + drain_deliver([]). + drain_deliver(Acc) -> receive Deliver = {deliver, _Topic, _Msg} -> diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 6f2df1b8c..cddaabe35 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -72,11 +72,13 @@ ]). -export([ deliver/2 + , enqueue/2 , retry/1 ]). -export([ takeover/1 , resume/2 + , redeliver/1 ]). -export([expire/2]). @@ -264,18 +266,30 @@ takeover(#session{subscriptions = Subs}) -> ok = emqx_broker:unsubscribe(TopicFilter) end, maps:to_list(Subs)). --spec(resume(emqx_types:client_id(), session()) -> session()). -resume(ClientId, Session = #session{subscriptions = Subs}) -> +-spec(resume(emqx_types:client_id(), session()) -> ok). +resume(ClientId, #session{subscriptions = Subs}) -> ?LOG(info, "Session is resumed."), - %% 1. Subscribe again - ok = lists:foreach(fun({TopicFilter, SubOpts}) -> - ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) - end, maps:to_list(Subs)), + %% 1. Subscribe again. + lists:foreach(fun({TopicFilter, SubOpts}) -> + ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) + end, maps:to_list(Subs)). %% 2. Run hooks. - ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(Session)]), + %% ok = emqx_hooks:run('session.resumed', [#{client_id => ClientId}, attrs(Session)]), %% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages - %% noreply(dequeue(retry_delivery(true, State1))); - Session. + %%Session. + +redeliver(Session = #session{inflight = Inflight}) -> + Publishes = lists:map(fun({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId, ?RC_SUCCESS}; + ({PacketId, {Msg, _Ts}}) -> + {publish, PacketId, Msg} + end, emqx_inflight:to_list(Inflight)), + case dequeue(Session) of + {ok, NSession} -> + {ok, Publishes, NSession}; + {ok, More, NSession} -> + {ok, lists:append(Publishes, More), NSession} + end. %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE @@ -501,7 +515,13 @@ deliver([Msg = #message{qos = QoS}|More], Acc, deliver(More, [Publish|Acc], next_pkt_id(Session1)) end. -enqueue(Msg, Session = #session{mqueue = Q}) -> +enqueue(Delivers, Session = #session{subscriptions = Subs}) + when is_list(Delivers) -> + Msgs = [enrich(get_subopts(Topic, Subs), Msg, Session) + || {deliver, Topic, Msg} <- Delivers], + lists:foldl(fun enqueue/2, Session, Msgs); + +enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> emqx_pd:update_counter(enqueue_stats, 1), {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), if