fix emqx_session:unsubscribe bug

This commit is contained in:
Gilbert Wong 2018-08-30 13:56:55 +08:00
parent db76177228
commit b0ed953708
1 changed files with 90 additions and 85 deletions

View File

@ -62,87 +62,87 @@
-import(emqx_zone, [get_env/2, get_env/3]). -import(emqx_zone, [get_env/2, get_env/3]).
-record(state, { -record(state, {
%% Clean Start Flag %% Clean Start Flag
clean_start = false :: boolean(), clean_start = false :: boolean(),
%% Client Binding: local | remote %% Client Binding: local | remote
binding = local :: local | remote, binding = local :: local | remote,
%% ClientId: Identifier of Session %% ClientId: Identifier of Session
client_id :: binary(), client_id :: binary(),
%% Username %% Username
username :: binary() | undefined, username :: binary() | undefined,
%% Connection pid binding with session %% Connection pid binding with session
conn_pid :: pid(), conn_pid :: pid(),
%% Old Connection Pid that has been kickout %% Old Connection Pid that has been kickout
old_conn_pid :: pid(), old_conn_pid :: pid(),
%% Next packet id of the session %% Next packet id of the session
next_pkt_id = 1 :: emqx_mqtt_types:packet_id(), next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
%% Max subscriptions %% Max subscriptions
max_subscriptions :: non_neg_integer(), max_subscriptions :: non_neg_integer(),
%% Clients Subscriptions. %% Clients Subscriptions.
subscriptions :: map(), subscriptions :: map(),
%% Upgrade QoS? %% Upgrade QoS?
upgrade_qos = false :: boolean(), upgrade_qos = false :: boolean(),
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked. %% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
inflight :: emqx_inflight:inflight(), inflight :: emqx_inflight:inflight(),
%% Max Inflight Size. DEPRECATED: Get from inflight %% Max Inflight Size. DEPRECATED: Get from inflight
%% max_inflight = 32 :: non_neg_integer(), %% max_inflight = 32 :: non_neg_integer(),
%% Retry interval for redelivering QoS1/2 messages %% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: timeout(), retry_interval = 20000 :: timeout(),
%% Retry Timer %% Retry Timer
retry_timer :: reference() | undefined, retry_timer :: reference() | undefined,
%% All QoS1, QoS2 messages published to when client is disconnected. %% All QoS1, QoS2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client. %% QoS 1 and QoS 2 messages pending transmission to the Client.
%% %%
%% Optionally, QoS 0 messages pending transmission to the Client. %% Optionally, QoS 0 messages pending transmission to the Client.
mqueue :: emqx_mqueue:mqueue(), mqueue :: emqx_mqueue:mqueue(),
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel. %% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(), awaiting_rel :: map(),
%% Max Packets Awaiting PUBREL %% Max Packets Awaiting PUBREL
max_awaiting_rel = 100 :: non_neg_integer(), max_awaiting_rel = 100 :: non_neg_integer(),
%% Awaiting PUBREL Timeout %% Awaiting PUBREL Timeout
await_rel_timeout = 20000 :: timeout(), await_rel_timeout = 20000 :: timeout(),
%% Awaiting PUBREL Timer %% Awaiting PUBREL Timer
await_rel_timer :: reference() | undefined, await_rel_timer :: reference() | undefined,
%% Session Expiry Interval %% Session Expiry Interval
expiry_interval = 7200000 :: timeout(), expiry_interval = 7200000 :: timeout(),
%% Expired Timer %% Expired Timer
expiry_timer :: reference() | undefined, expiry_timer :: reference() | undefined,
%% Enable Stats %% Enable Stats
enable_stats :: boolean(), enable_stats :: boolean(),
%% Stats timer %% Stats timer
stats_timer :: reference() | undefined, stats_timer :: reference() | undefined,
%% TODO: %% TODO:
deliver_stats = 0, deliver_stats = 0,
%% TODO: %% TODO:
enqueue_stats = 0, enqueue_stats = 0,
%% Created at %% Created at
created_at :: erlang:timestamp() created_at :: erlang:timestamp()
}). }).
-define(TIMEOUT, 60000). -define(TIMEOUT, 60000).
@ -284,7 +284,12 @@ pubcomp(SPid, PacketId, ReasonCode) ->
-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). -spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)). TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts);
(RawTopic) ->
emqx_topic:parse(RawTopic)
end, RawTopicFilters),
unsubscribe(SPid, undefined, #{}, TopicFilters).
-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), -spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
@ -424,20 +429,20 @@ handle_call(Req, _From, State) ->
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} = {ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) -> lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of {[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} -> {ok, SubOpts} ->
SubMap; SubMap;
{ok, _SubOpts} -> {ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap); maps:put(Topic, SubOpts, SubMap);
error -> error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts), emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap) maps:put(Topic, SubOpts, SubMap)
end} end}
end, {[], Subscriptions}, TopicFilters), end, {[], Subscriptions}, TopicFilters),
suback(FromPid, PacketId, ReasonCodes), suback(FromPid, PacketId, ReasonCodes),
{noreply, State#state{subscriptions = Subscriptions1}}; {noreply, State#state{subscriptions = Subscriptions1}};
@ -445,16 +450,16 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}}, handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) -> State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} = {ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) -> lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of case maps:find(Topic, SubMap) of
{ok, SubOpts} -> {ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic, ClientId), ok = emqx_broker:unsubscribe(Topic, ClientId),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)}; {[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error -> error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap} {[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
end end
end, {[], Subscriptions}, TopicFilters), end, {[], Subscriptions}, TopicFilters),
unsuback(From, PacketId, ReasonCodes), unsuback(From, PacketId, ReasonCodes),
{noreply, State#state{subscriptions = Subscriptions1}}; {noreply, State#state{subscriptions = Subscriptions1}};
@ -524,7 +529,7 @@ handle_cast(Msg, State) ->
%% Batch dispatch %% Batch dispatch
handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) ->
{noreply, lists:foldl(fun(Msg, NewState) -> {noreply, lists:foldl(fun(Msg, NewState) ->
element(2, handle_info({dispatch, Topic, Msg}, NewState)) element(2, handle_info({dispatch, Topic, Msg}, NewState))
end, State, Msgs)}; end, State, Msgs)};
%% Dispatch message %% Dispatch message
@ -684,7 +689,7 @@ sortfun(inflight) ->
sortfun(awaiting_rel) -> sortfun(awaiting_rel) ->
fun({_, #message{timestamp = Ts1}}, fun({_, #message{timestamp = Ts1}},
{_, #message{timestamp = Ts2}}) -> {_, #message{timestamp = Ts2}}) ->
Ts1 < Ts2 Ts1 < Ts2
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -726,7 +731,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) ->
inc_stats(deliver, State); inc_stats(deliver, State);
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight}) dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case emqx_inflight:is_full(Inflight) of case emqx_inflight:is_full(Inflight) of
true -> true ->
enqueue_msg(Msg, State); enqueue_msg(Msg, State);
@ -824,7 +829,7 @@ dequeue2(State = #state{mqueue = Q}) ->
%% Ensure timers %% Ensure timers
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) -> ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)}; State#state{await_rel_timer = emqx_misc:start_timer(Timeout, check_awaiting_rel)};
ensure_await_rel_timer(State) -> ensure_await_rel_timer(State) ->
State. State.