From bbcd2bffc5e711df4422f9f01ebdf68e8f5b0f83 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Sat, 7 Dec 2019 15:54:02 +0800 Subject: [PATCH] Improve the session module (#3082) Improve the session module --- etc/emqx.conf | 2 +- priv/emqx.schema | 2 +- src/emqx_channel.erl | 10 ++ src/emqx_inflight.erl | 5 + src/emqx_session.erl | 254 ++++++++++++++++++++++-------------------- 5 files changed, 148 insertions(+), 125 deletions(-) diff --git a/etc/emqx.conf b/etc/emqx.conf index 396b75332..560b901f2 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -679,7 +679,7 @@ zone.external.max_inflight = 32 ## Retry interval for QoS1/2 message delivering. ## ## Value: Duration -zone.external.retry_interval = 20s +zone.external.retry_interval = 30s ## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index 253c4bb23..54e5f0f4c 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -847,7 +847,7 @@ end}. %% @doc Retry interval for redelivering QoS1/2 messages. {mapping, "zone.$name.retry_interval", "emqx.zones", [ - {default, "20s"}, + {default, "30s"}, {datatype, {duration, s}} ]}. diff --git a/src/emqx_channel.erl b/src/emqx_channel.erl index 204e3eb25..f2e875a26 100644 --- a/src/emqx_channel.erl +++ b/src/emqx_channel.erl @@ -563,6 +563,12 @@ handle_out(connack, {ReasonCode, _ConnPkt}, shutdown(emqx_reason_codes:name(ReasonCode), AckPacket, Channel); %% Optimize? +handle_out(publish, [], Channel) -> + {ok, Channel}; + +handle_out(publish, [{pubrel, PacketId}], Channel) -> + handle_out(pubrel, {PacketId, ?RC_SUCCESS}, Channel); + handle_out(publish, [Pub = {_PacketId, Msg}], Channel) -> case ignore_local(Msg, Channel) of true -> {ok, Channel}; @@ -649,6 +655,10 @@ handle_publish(Publishes, Channel) -> handle_publish([], Acc, _Channel) -> lists:reverse(Acc); +handle_publish([{pubrel, PacketId}|More], Acc, Channel) -> + Packet = ?PUBREL_PACKET(PacketId, ?RC_SUCCESS), + handle_publish(More, [Packet|Acc], Channel); + handle_publish([Pub = {_PacketId, Msg}|More], Acc, Channel) -> case ignore_local(Msg, Channel) of true -> handle_publish(More, Acc, Channel); diff --git a/src/emqx_inflight.erl b/src/emqx_inflight.erl index a51abdc68..a25a60e9b 100644 --- a/src/emqx_inflight.erl +++ b/src/emqx_inflight.erl @@ -29,6 +29,7 @@ , delete/2 , values/1 , to_list/1 + , to_list/2 , size/1 , max_size/1 , is_full/1 @@ -105,6 +106,10 @@ values(?Inflight(Tree)) -> to_list(?Inflight(Tree)) -> gb_trees:to_list(Tree). +-spec(to_list(fun(), inflight()) -> list({key(), term()})). +to_list(SortFun, ?Inflight(Tree)) -> + lists:sort(SortFun, gb_trees:to_list(Tree)). + -spec(window(inflight()) -> list()). window(Inflight = ?Inflight(Tree)) -> case gb_trees:is_empty(Tree) of diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 8fe30a310..a6917536a 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -85,7 +85,7 @@ -export([expire/2]). -%% Export for ct +%% Export for CT -export([set_field/3]). -export_type([session/0]). @@ -99,7 +99,8 @@ max_subscriptions :: non_neg_integer(), %% Upgrade QoS? upgrade_qos :: boolean(), - %% Client <- Broker: QoS1/2 messages sent to the client but unacked. + %% Client <- Broker: QoS1/2 messages sent to the client but + %% have not been unacked. inflight :: emqx_inflight:inflight(), %% All QoS1/2 messages published to when client is disconnected, %% or QoS1/2 messages pending transmission to the Client. @@ -108,14 +109,14 @@ mqueue :: emqx_mqueue:mqueue(), %% Next packet id of the session next_pkt_id = 1 :: emqx_types:packet_id(), - %% Retry interval for redelivering QoS1/2 messages (Unit: second) + %% Retry interval for redelivering QoS1/2 messages (Unit: millsecond) retry_interval :: timeout(), - %% Client -> Broker: QoS2 messages received from client and - %% waiting for pubrel. + %% Client -> Broker: QoS2 messages received from the client, but + %% have not been completely acknowledged awaiting_rel :: map(), - %% Max Packets Awaiting PUBREL + %% Maximum number of awaiting QoS2 messages allowed max_awaiting_rel :: non_neg_integer(), - %% Awaiting PUBREL Timeout (Unit: second) + %% Awaiting PUBREL Timeout (Unit: millsecond) await_rel_timeout :: timeout(), %% Created at created_at :: pos_integer() @@ -125,6 +126,10 @@ -type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}). +-type(pubrel() :: {pubrel, emqx_types:packet_id()}). + +-type(replies() :: list(publish() | pubrel())). + -define(INFO_KEYS, [subscriptions, upgrade_qos, retry_interval, @@ -150,7 +155,6 @@ %% Init a Session %%-------------------------------------------------------------------- -%% @doc Init a session. -spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()). init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> #session{max_subscriptions = get_env(Zone, max_subscriptions, 0), @@ -159,10 +163,10 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) -> inflight = emqx_inflight:new(MaxInflight), mqueue = init_mqueue(Zone), next_pkt_id = 1, - retry_interval = get_env(Zone, retry_interval, 0), + retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)), awaiting_rel = #{}, max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100), - await_rel_timeout = get_env(Zone, await_rel_timeout, 300), + await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)), created_at = erlang:system_time(second) }. @@ -174,15 +178,15 @@ init_mqueue(Zone) -> default_priority => get_env(Zone, mqueue_default_priority, lowest) }). +%%-------------------------------------------------------------------- +%% Info, Stats +%%-------------------------------------------------------------------- + %% @doc Get infos of the session. -spec(info(session()) -> emqx_types:infos()). info(Session) -> maps:from_list(info(?INFO_KEYS, Session)). -%% @doc Get stats of the session. --spec(stats(session()) -> emqx_types:stats()). -stats(Session) -> info(?STATS_KEYS, Session). - info(Keys, Session) when is_list(Keys) -> [{Key, info(Key, Session)} || Key <- Keys]; info(subscriptions, #session{subscriptions = Subs}) -> @@ -200,7 +204,9 @@ info(inflight_cnt, #session{inflight = Inflight}) -> info(inflight_max, #session{inflight = Inflight}) -> emqx_inflight:max_size(Inflight); info(retry_interval, #session{retry_interval = Interval}) -> - Interval; + Interval div 1000; +info(mqueue, #session{mqueue = MQueue}) -> + MQueue; info(mqueue_len, #session{mqueue = MQueue}) -> emqx_mqueue:len(MQueue); info(mqueue_max, #session{mqueue = MQueue}) -> @@ -216,52 +222,20 @@ info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) -> info(awaiting_rel_max, #session{max_awaiting_rel = Max}) -> Max; info(await_rel_timeout, #session{await_rel_timeout = Timeout}) -> - Timeout; + Timeout div 1000; info(created_at, #session{created_at = CreatedAt}) -> CreatedAt. -%% For tests -set_field(Name, Val, Channel) -> - Fields = record_info(fields, session), - Pos = emqx_misc:index_of(Name, Fields), - setelement(Pos+1, Channel, Val). - --spec(takeover(session()) -> ok). -takeover(#session{subscriptions = Subs}) -> - lists:foreach(fun({TopicFilter, _SubOpts}) -> - ok = emqx_broker:unsubscribe(TopicFilter) - end, maps:to_list(Subs)). - --spec(resume(emqx_types:clientid(), session()) -> ok). -resume(ClientId, #session{subscriptions = Subs}) -> - %% 1. Subscribe again. - lists:foreach(fun({TopicFilter, SubOpts}) -> - ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) - end, maps:to_list(Subs)). - %% 2. Run hooks. - %% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]), - %% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages - %%Session. - -redeliver(Session = #session{inflight = Inflight}) -> - Publishes = lists:map(fun({PacketId, {pubrel, _Ts}}) -> - {pubrel, PacketId, ?RC_SUCCESS}; - ({PacketId, {Msg, _Ts}}) -> - {publish, PacketId, Msg} - end, emqx_inflight:to_list(Inflight)), - case dequeue(Session) of - {ok, NSession} -> - {ok, Publishes, NSession}; - {ok, More, NSession} -> - {ok, lists:append(Publishes, More), NSession} - end. +%% @doc Get stats of the session. +-spec(stats(session()) -> emqx_types:stats()). +stats(Session) -> info(?STATS_KEYS, Session). %%-------------------------------------------------------------------- %% Client -> Broker: SUBSCRIBE %%-------------------------------------------------------------------- --spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), - session()) +-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), + emqx_types:subopts(), session()) -> {ok, session()} | {error, emqx_types:reason_code()}). subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) -> @@ -337,7 +311,7 @@ is_awaiting_full(#session{awaiting_rel = AwaitingRel, -spec(puback(emqx_types:packet_id(), session()) -> {ok, emqx_types:message(), session()} - | {ok, emqx_types:message(), list(publish()), session()} + | {ok, emqx_types:message(), replies(), session()} | {error, emqx_types:reason_code()}). puback(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of @@ -366,7 +340,8 @@ return_with(Msg, {ok, Publishes, Session}) -> pubrec(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of {value, {Msg, _Ts}} when is_record(Msg, message) -> - Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight), + Inflight1 = emqx_inflight:update( + PacketId, {pubrel, os:timestamp()}, Inflight), {ok, Msg, Session#session{inflight = Inflight1}}; {value, {pubrel, _Ts}} -> {error, ?RC_PACKET_IDENTIFIER_IN_USE}; @@ -386,14 +361,14 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) -> {ok, Session#session{awaiting_rel = AwaitingRel1}}; error -> {error, ?RC_PACKET_IDENTIFIER_NOT_FOUND} - end. + end. %%-------------------------------------------------------------------- %% Client -> Broker: PUBCOMP %%-------------------------------------------------------------------- -spec(pubcomp(emqx_types:packet_id(), session()) - -> {ok, session()} | {ok, list(publish()), session()} + -> {ok, session()} | {ok, replies(), session()} | {error, emqx_types:reason_code()}). pubcomp(PacketId, Session = #session{inflight = Inflight}) -> case emqx_inflight:lookup(PacketId, Inflight) of @@ -415,29 +390,19 @@ dequeue(Session = #session{inflight = Inflight, mqueue = Q}) -> true -> {ok, Session}; false -> {Msgs, Q1} = dequeue(batch_n(Inflight), [], Q), - deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1}) + deliver(Msgs, [], Session#session{mqueue = Q1}) end. -dequeue(Cnt, Msgs, Q) when Cnt =< 0 -> - {Msgs, Q}; +dequeue(0, Msgs, Q) -> + {lists:reverse(Msgs), Q}; dequeue(Cnt, Msgs, Q) -> case emqx_mqueue:out(Q) of - {empty, _Q} -> {Msgs, Q}; + {empty, _Q} -> dequeue(0, Msgs, Q); + {{value, Msg = #message{qos = ?QOS_0}}, Q1} -> + dequeue(Cnt, [Msg|Msgs], Q1); {{value, Msg}, Q1} -> - case emqx_message:is_expired(Msg) of - true -> - ok = emqx_metrics:inc('messages.expired'), - dequeue(Cnt-1, Msgs, Q1); - false -> - dequeue(Cnt-1, [Msg|Msgs], Q1) - end - end. - -batch_n(Inflight) -> - case emqx_inflight:max_size(Inflight) of - 0 -> ?DEFAULT_BATCH_N; - Sz -> Sz - emqx_inflight:size(Inflight) + dequeue(Cnt-1, [Msg|Msgs], Q1) end. %%-------------------------------------------------------------------- @@ -445,7 +410,7 @@ batch_n(Inflight) -> %%-------------------------------------------------------------------- -spec(deliver(list(emqx_types:deliver()), session()) - -> {ok, session()} | {ok, list(publish()), session()}). + -> {ok, session()} | {ok, replies(), session()}). deliver(Delivers, Session) -> Msgs = lists:map(enrich_fun(Session), Delivers), deliver(Msgs, [], Session). @@ -457,8 +422,8 @@ deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) -> Publish = {undefined, maybe_ack(Msg)}, deliver(More, [Publish|Acc], Session); -deliver([Msg = #message{qos = QoS}|More], Acc, - Session = #session{next_pkt_id = PacketId, inflight = Inflight}) +deliver([Msg = #message{qos = QoS}|More], Acc, Session = + #session{next_pkt_id = PacketId, inflight = Inflight}) when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> @@ -479,7 +444,6 @@ enqueue(Delivers, Session) when is_list(Delivers) -> Msgs = lists:map(enrich_fun(Session), Delivers), lists:foldl(fun enqueue/2, Session, Msgs); -%%TODO: how to handle the dropped msg? enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) -> {Dropped, NewQ} = emqx_mqueue:in(Msg, Q), if is_record(Dropped, message) -> @@ -504,14 +468,6 @@ maybe_nack(Msg) -> emqx_shared_sub:is_ack_required(Msg) andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)). -%%-------------------------------------------------------------------- -%% Awaiting ACK for QoS1/QoS2 Messages -%%-------------------------------------------------------------------- - -await(PacketId, Msg, Session = #session{inflight = Inflight}) -> - Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight), - Session#session{inflight = Inflight1}. - get_subopts(Topic, SubMap) -> case maps:find(Topic, SubMap) of {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> @@ -542,47 +498,48 @@ enrich_subopts([{subid, SubId}|Opts], Msg, Session) -> Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg), enrich_subopts(Opts, Msg1, Session). +%%-------------------------------------------------------------------- +%% Awaiting ACK for QoS1/QoS2 Messages +%%-------------------------------------------------------------------- + +await(PacketId, Msg, Session = #session{inflight = Inflight}) -> + Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight), + Session#session{inflight = Inflight1}. + %%-------------------------------------------------------------------- %% Retry Delivery %%-------------------------------------------------------------------- -%% Redeliver at once if force is true +-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}). retry(Session = #session{inflight = Inflight}) -> case emqx_inflight:is_empty(Inflight) of true -> {ok, Session}; false -> - SortFun = fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end, - Msgs = lists:sort(SortFun, emqx_inflight:to_list(Inflight)), - retry_delivery(Msgs, os:timestamp(), [], Session) + retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight), + [], os:timestamp(), Session) end. -retry_delivery([], _Now, Acc, Session) -> - %% Retry again... - {ok, lists:reverse(Acc), Session}; +retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) -> + {ok, lists:reverse(Acc), Interval, Session}; -retry_delivery([{PacketId, {Val, Ts}}|More], Now, Acc, - Session = #session{retry_interval = Interval, - inflight = Inflight}) -> - IntervalMs = Interval * 1000, - %% Microseconds -> MilliSeconds - Age = timer:now_diff(Now, Ts) div 1000, - if - Age >= IntervalMs -> - {Acc1, Inflight1} = retry_delivery(PacketId, Val, Now, Acc, Inflight), - retry_delivery(More, Now, Acc1, Session#session{inflight = Inflight1}); +retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session = + #session{retry_interval = Interval, inflight = Inflight}) -> + case (Age = age(Now, Ts)) >= Interval of true -> - {ok, lists:reverse(Acc), IntervalMs - max(0, Age), Session} + {Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight), + retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1}); + false -> + {ok, lists:reverse(Acc), Interval - max(0, Age), Session} end. retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) -> case emqx_message:is_expired(Msg) of true -> - ok = emqx_metrics:inc('messages.expired'), {Acc, emqx_inflight:delete(PacketId, Inflight)}; false -> Msg1 = emqx_message:set_flag(dup, true, Msg), - {[{PacketId, Msg1}|Acc], - emqx_inflight:update(PacketId, {Msg1, Now}, Inflight)} + Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight), + {[{PacketId, Msg1}|Acc], Inflight1} end; retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> @@ -593,30 +550,60 @@ retry_delivery(PacketId, pubrel, Now, Acc, Inflight) -> %% Expire Awaiting Rel %%-------------------------------------------------------------------- +-spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}). expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) -> case maps:size(AwaitingRel) of 0 -> {ok, Session}; - _ -> - AwaitingRel1 = lists:keysort(2, maps:to_list(AwaitingRel)), - expire_awaiting_rel(AwaitingRel1, os:timestamp(), Session) + _ -> expire_awaiting_rel(os:timestamp(), Session) end. -expire_awaiting_rel([], _Now, Session) -> - {ok, Session}; +expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel, + await_rel_timeout = Timeout}) -> -expire_awaiting_rel([{PacketId, Ts} | More], Now, - Session = #session{awaiting_rel = AwaitingRel, - await_rel_timeout = Timeout}) -> - case (timer:now_diff(Now, Ts) div 1000) of - Age when Age >= (Timeout * 1000) -> - ok = emqx_metrics:inc('messages.qos2.expired'), - ?LOG(warning, "Dropped qos2 packet ~s due to await_rel timeout", [PacketId]), - Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)}, - expire_awaiting_rel(More, Now, Session1); - Age -> - {ok, Timeout - max(0, Age), Session} + NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end, + case maps:filter(NotExpired, AwaitingRel) of + [] -> {ok, Session}; + AwaitingRel1 -> + {ok, Timeout, Session#session{awaiting_rel = AwaitingRel1}} end. +%%-------------------------------------------------------------------- +%% Takeover, Resume and Redeliver +%%-------------------------------------------------------------------- + +-spec(takeover(session()) -> ok). +takeover(#session{subscriptions = Subs}) -> + lists:foreach(fun({TopicFilter, _SubOpts}) -> + ok = emqx_broker:unsubscribe(TopicFilter) + end, maps:to_list(Subs)). + +-spec(resume(emqx_types:clientid(), session()) -> ok). +resume(ClientId, #session{subscriptions = Subs}) -> + %% 1. Subscribe again. + lists:foreach(fun({TopicFilter, SubOpts}) -> + ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts) + end, maps:to_list(Subs)). + %% 2. Run hooks. + %% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]), + %% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages + %%Session. + +-spec(redeliver(session()) -> {ok, replies(), session()}). +redeliver(Session = #session{inflight = Inflight}) -> + Pubs = replay(Inflight), + case dequeue(Session) of + {ok, NSession} -> {ok, Pubs, NSession}; + {ok, More, NSession} -> + {ok, lists:append(Pubs, More), NSession} + end. + +replay(Inflight) -> + lists:map(fun({PacketId, {pubrel, _Ts}}) -> + {pubrel, PacketId}; + ({PacketId, {Msg, _Ts}}) -> + {PacketId, emqx_message:set_flag(dup, true, Msg)} + end, emqx_inflight:to_list(Inflight)). + %%-------------------------------------------------------------------- %% Next Packet Id %%-------------------------------------------------------------------- @@ -631,4 +618,25 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) -> %% Helper functions %%-------------------------------------------------------------------- +-compile({inline, [sort_fun/0, batch_n/1, age/2]}). + +sort_fun() -> + fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end. + +batch_n(Inflight) -> + case emqx_inflight:max_size(Inflight) of + 0 -> ?DEFAULT_BATCH_N; + Sz -> Sz - emqx_inflight:size(Inflight) + end. + +age(Now, Ts) -> timer:now_diff(Now, Ts) div 1000. + +%%-------------------------------------------------------------------- +%% For CT tests +%%-------------------------------------------------------------------- + +set_field(Name, Val, Channel) -> + Fields = record_info(fields, session), + Pos = emqx_misc:index_of(Name, Fields), + setelement(Pos+1, Channel, Val).