From b0ed953708817fc09c286aea0d1c5eab992fffd8 Mon Sep 17 00:00:00 2001 From: Gilbert Wong Date: Thu, 30 Aug 2018 13:56:55 +0800 Subject: [PATCH] fix emqx_session:unsubscribe bug --- src/emqx_session.erl | 175 ++++++++++++++++++++++--------------------- 1 file changed, 90 insertions(+), 85 deletions(-) diff --git a/src/emqx_session.erl b/src/emqx_session.erl index ab9096f23..0ac1cf59d 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -62,87 +62,87 @@ -import(emqx_zone, [get_env/2, get_env/3]). -record(state, { - %% Clean Start Flag - clean_start = false :: boolean(), + %% Clean Start Flag + clean_start = false :: boolean(), - %% Client Binding: local | remote - binding = local :: local | remote, + %% Client Binding: local | remote + binding = local :: local | remote, - %% ClientId: Identifier of Session - client_id :: binary(), + %% ClientId: Identifier of Session + client_id :: binary(), - %% Username - username :: binary() | undefined, + %% Username + username :: binary() | undefined, - %% Connection pid binding with session - conn_pid :: pid(), + %% Connection pid binding with session + conn_pid :: pid(), - %% Old Connection Pid that has been kickout - old_conn_pid :: pid(), + %% Old Connection Pid that has been kickout + old_conn_pid :: pid(), - %% Next packet id of the session - next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), + %% Next packet id of the session + next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), - %% Max subscriptions - max_subscriptions :: non_neg_integer(), + %% Max subscriptions + max_subscriptions :: non_neg_integer(), - %% Client’s Subscriptions. - subscriptions :: map(), + %% Client’s Subscriptions. + subscriptions :: map(), - %% Upgrade QoS? - upgrade_qos = false :: boolean(), + %% Upgrade QoS? + upgrade_qos = false :: boolean(), - %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. - inflight :: emqx_inflight:inflight(), + %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. + inflight :: emqx_inflight:inflight(), - %% Max Inflight Size. DEPRECATED: Get from inflight - %% max_inflight = 32 :: non_neg_integer(), + %% Max Inflight Size. DEPRECATED: Get from inflight + %% max_inflight = 32 :: non_neg_integer(), - %% Retry interval for redelivering QoS1/2 messages - retry_interval = 20000 :: timeout(), + %% Retry interval for redelivering QoS1/2 messages + retry_interval = 20000 :: timeout(), - %% Retry Timer - retry_timer :: reference() | undefined, + %% Retry Timer + retry_timer :: reference() | undefined, - %% All QoS1, QoS2 messages published to when client is disconnected. - %% QoS 1 and QoS 2 messages pending transmission to the Client. - %% - %% Optionally, QoS 0 messages pending transmission to the Client. - mqueue :: emqx_mqueue:mqueue(), + %% All QoS1, QoS2 messages published to when client is disconnected. + %% QoS 1 and QoS 2 messages pending transmission to the Client. + %% + %% Optionally, QoS 0 messages pending transmission to the Client. + mqueue :: emqx_mqueue:mqueue(), - %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. - awaiting_rel :: map(), + %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. + awaiting_rel :: map(), - %% Max Packets Awaiting PUBREL - max_awaiting_rel = 100 :: non_neg_integer(), + %% Max Packets Awaiting PUBREL + max_awaiting_rel = 100 :: non_neg_integer(), - %% Awaiting PUBREL Timeout - await_rel_timeout = 20000 :: timeout(), + %% Awaiting PUBREL Timeout + await_rel_timeout = 20000 :: timeout(), - %% Awaiting PUBREL Timer - await_rel_timer :: reference() | undefined, + %% Awaiting PUBREL Timer + await_rel_timer :: reference() | undefined, - %% Session Expiry Interval - expiry_interval = 7200000 :: timeout(), + %% Session Expiry Interval + expiry_interval = 7200000 :: timeout(), - %% Expired Timer - expiry_timer :: reference() | undefined, + %% Expired Timer + expiry_timer :: reference() | undefined, - %% Enable Stats - enable_stats :: boolean(), + %% Enable Stats + enable_stats :: boolean(), - %% Stats timer - stats_timer :: reference() | undefined, + %% Stats timer + stats_timer :: reference() | undefined, - %% TODO: - deliver_stats = 0, + %% TODO: + deliver_stats = 0, - %% TODO: - enqueue_stats = 0, + %% TODO: + enqueue_stats = 0, - %% Created at - created_at :: erlang:timestamp() - }). + %% Created at + created_at :: erlang:timestamp() + }). -define(TIMEOUT, 60000). @@ -284,7 +284,12 @@ pubcomp(SPid, PacketId, ReasonCode) -> -spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> - unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). + TopicFilters = lists:map(fun({RawTopic, Opts}) -> + emqx_topic:parse(RawTopic, Opts); + (RawTopic) -> + emqx_topic:parse(RawTopic) + end, RawTopicFilters), + unsubscribe(SPid, undefined, #{}, TopicFilters). -spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). @@ -424,20 +429,20 @@ handle_call(Req, _From, State) -> handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> - {[QoS|RcAcc], case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - SubMap; - {ok, _SubOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), - maps:put(Topic, SubOpts, SubMap); - error -> - emqx_broker:subscribe(Topic, ClientId, SubOpts), - emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), - maps:put(Topic, SubOpts, SubMap) - end} - end, {[], Subscriptions}, TopicFilters), + lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> + {[QoS|RcAcc], case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + SubMap; + {ok, _SubOpts} -> + emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap); + error -> + emqx_broker:subscribe(Topic, ClientId, SubOpts), + emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), + maps:put(Topic, SubOpts, SubMap) + end} + end, {[], Subscriptions}, TopicFilters), suback(FromPid, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; @@ -445,16 +450,16 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> {ReasonCodes, Subscriptions1} = - lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> - case maps:find(Topic, SubMap) of - {ok, SubOpts} -> - ok = emqx_broker:unsubscribe(Topic, ClientId), - emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), - {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; - error -> - {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} - end - end, {[], Subscriptions}, TopicFilters), + lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> + case maps:find(Topic, SubMap) of + {ok, SubOpts} -> + ok = emqx_broker:unsubscribe(Topic, ClientId), + emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), + {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; + error -> + {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} + end + end, {[], Subscriptions}, TopicFilters), unsuback(From, PacketId, ReasonCodes), {noreply, State#state{subscriptions = Subscriptions1}}; @@ -524,7 +529,7 @@ handle_cast(Msg, State) -> %% Batch dispatch handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> {noreply, lists:foldl(fun(Msg, NewState) -> - element(2, handle_info({dispatch, Topic, Msg}, NewState)) + element(2, handle_info({dispatch, Topic, Msg}, NewState)) end, State, Msgs)}; %% Dispatch message @@ -684,7 +689,7 @@ sortfun(inflight) -> sortfun(awaiting_rel) -> fun({_, #message{timestamp = Ts1}}, {_, #message{timestamp = Ts2}}) -> - Ts1 < Ts2 + Ts1 < Ts2 end. %%------------------------------------------------------------------------------ @@ -726,7 +731,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) -> inc_stats(deliver, State); dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); @@ -824,7 +829,7 @@ dequeue2(State = #state{mqueue = Q}) -> %% Ensure timers ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> - State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; + State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; ensure_await_rel_timer(State) -> State.