fix(emqx_sn): race_condition when discarding
This commit is contained in:
parent
4885171e4f
commit
98edbc39af
|
@ -202,7 +202,7 @@ idle(cast, {incoming, ?SN_PUBLISH_MSG(#mqtt_sn_flags{qos = ?QOS_NEG1,
|
||||||
}, TopicId, _MsgId, Data)},
|
}, TopicId, _MsgId, Data)},
|
||||||
State = #state{clientid = ClientId, registry = Registry}) ->
|
State = #state{clientid = ClientId, registry = Registry}) ->
|
||||||
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
|
TopicName = case (TopicIdType =:= ?SN_SHORT_TOPIC) of
|
||||||
false -> emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId);
|
false -> emqx_sn_registry:lookup_topic(Registry, self(), TopicId);
|
||||||
true -> <<TopicId:16>>
|
true -> <<TopicId:16>>
|
||||||
end,
|
end,
|
||||||
_ = case TopicName =/= undefined of
|
_ = case TopicName =/= undefined of
|
||||||
|
@ -294,7 +294,7 @@ wait_for_will_msg(EventType, EventContent, State) ->
|
||||||
|
|
||||||
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
|
connected(cast, {incoming, ?SN_REGISTER_MSG(_TopicId, MsgId, TopicName)},
|
||||||
State = #state{clientid = ClientId, registry = Registry}) ->
|
State = #state{clientid = ClientId, registry = Registry}) ->
|
||||||
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
|
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
|
||||||
TopicId when is_integer(TopicId) ->
|
TopicId when is_integer(TopicId) ->
|
||||||
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
|
?LOG(debug, "register ClientId=~p, TopicName=~p, TopicId=~p", [ClientId, TopicName, TopicId]),
|
||||||
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
|
send_message(?SN_REGACK_MSG(TopicId, MsgId, ?SN_RC_ACCEPTED), State);
|
||||||
|
@ -586,10 +586,9 @@ handle_event(EventType, EventContent, StateName, State) ->
|
||||||
[StateName, {EventType, EventContent}]),
|
[StateName, {EventType, EventContent}]),
|
||||||
{keep_state, State}.
|
{keep_state, State}.
|
||||||
|
|
||||||
terminate(Reason, _StateName, #state{clientid = ClientId,
|
terminate(Reason, _StateName, #state{channel = Channel,
|
||||||
channel = Channel,
|
|
||||||
registry = Registry}) ->
|
registry = Registry}) ->
|
||||||
emqx_sn_registry:unregister_topic(Registry, ClientId),
|
emqx_sn_registry:unregister_topic(Registry, self()),
|
||||||
case Channel =:= undefined of
|
case Channel =:= undefined of
|
||||||
true -> ok;
|
true -> ok;
|
||||||
false -> emqx_channel:terminate(Reason, Channel)
|
false -> emqx_channel:terminate(Reason, Channel)
|
||||||
|
@ -724,12 +723,10 @@ mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
|
||||||
?SN_UNSUBACK_MSG(MsgId);
|
?SN_UNSUBACK_MSG(MsgId);
|
||||||
|
|
||||||
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) ->
|
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{registry = Registry}) ->
|
||||||
NewPacketId = if
|
NewPacketId = if QoS =:= ?QOS_0 -> 0;
|
||||||
QoS =:= ?QOS_0 -> 0;
|
true -> PacketId
|
||||||
true -> PacketId
|
|
||||||
end,
|
end,
|
||||||
ClientId = get(clientid),
|
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, self(), Topic) of
|
||||||
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(Registry, ClientId, Topic) of
|
|
||||||
{predef, PredefTopicId} ->
|
{predef, PredefTopicId} ->
|
||||||
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
{?SN_PREDEFINED_TOPIC, PredefTopicId};
|
||||||
TopicId when is_integer(TopicId) ->
|
TopicId when is_integer(TopicId) ->
|
||||||
|
@ -830,7 +827,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
keepalive = Duration,
|
keepalive = Duration,
|
||||||
properties = OnlyOneInflight
|
properties = OnlyOneInflight
|
||||||
},
|
},
|
||||||
put(clientid, ClientId),
|
|
||||||
case WillFlag of
|
case WillFlag of
|
||||||
true -> send_message(?SN_WILLTOPICREQ_MSG(), State),
|
true -> send_message(?SN_WILLTOPICREQ_MSG(), State),
|
||||||
NState = State#state{connpkt = ConnPkt,
|
NState = State#state{connpkt = ConnPkt,
|
||||||
|
@ -847,7 +843,6 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
|
||||||
|
|
||||||
do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
||||||
peername = Peername,
|
peername = Peername,
|
||||||
clientid = OldClientId,
|
|
||||||
registry = Registry,
|
registry = Registry,
|
||||||
channel = Channel}) ->
|
channel = Channel}) ->
|
||||||
emqx_logger:set_metadata_clientid(ClientId),
|
emqx_logger:set_metadata_clientid(ClientId),
|
||||||
|
@ -855,7 +850,7 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
||||||
NChannel = case CleanStart of
|
NChannel = case CleanStart of
|
||||||
true ->
|
true ->
|
||||||
emqx_channel:terminate(normal, Channel),
|
emqx_channel:terminate(normal, Channel),
|
||||||
emqx_sn_registry:unregister_topic(Registry, OldClientId),
|
emqx_sn_registry:unregister_topic(Registry, self()),
|
||||||
emqx_channel:init(#{socktype => udp,
|
emqx_channel:init(#{socktype => udp,
|
||||||
sockname => Sockname,
|
sockname => Sockname,
|
||||||
peername => Peername,
|
peername => Peername,
|
||||||
|
@ -868,8 +863,8 @@ do_2nd_connect(Flags, Duration, ClientId, State = #state{sockname = Sockname,
|
||||||
do_connect(ClientId, CleanStart, Will, Duration, NState).
|
do_connect(ClientId, CleanStart, Will, Duration, NState).
|
||||||
|
|
||||||
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
||||||
State=#state{clientid = ClientId, registry = Registry}) ->
|
State=#state{registry = Registry}) ->
|
||||||
case emqx_sn_registry:register_topic(Registry, ClientId, TopicName) of
|
case emqx_sn_registry:register_topic(Registry, self(), TopicName) of
|
||||||
{error, too_large} ->
|
{error, too_large} ->
|
||||||
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
|
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
|
||||||
?SN_INVALID_TOPIC_ID,
|
?SN_INVALID_TOPIC_ID,
|
||||||
|
@ -883,8 +878,8 @@ handle_subscribe(?SN_NORMAL_TOPIC, TopicName, QoS, MsgId,
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
|
handle_subscribe(?SN_PREDEFINED_TOPIC, TopicId, QoS, MsgId,
|
||||||
State = #state{clientid = ClientId, registry = Registry}) ->
|
State = #state{registry = Registry}) ->
|
||||||
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
|
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
|
ok = send_message(?SN_SUBACK_MSG(#mqtt_sn_flags{qos = QoS},
|
||||||
TopicId,
|
TopicId,
|
||||||
|
@ -913,8 +908,8 @@ handle_unsubscribe(?SN_NORMAL_TOPIC, TopicId, MsgId, State) ->
|
||||||
proto_unsubscribe(TopicId, MsgId, State);
|
proto_unsubscribe(TopicId, MsgId, State);
|
||||||
|
|
||||||
handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
|
handle_unsubscribe(?SN_PREDEFINED_TOPIC, TopicId, MsgId,
|
||||||
State = #state{clientid = ClientId, registry = Registry}) ->
|
State = #state{registry = Registry}) ->
|
||||||
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
|
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok = send_message(?SN_UNSUBACK_MSG(MsgId), State),
|
ok = send_message(?SN_UNSUBACK_MSG(MsgId), State),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
@ -938,10 +933,10 @@ do_publish(?SN_NORMAL_TOPIC, TopicName, Data, Flags, MsgId, State) ->
|
||||||
<<TopicId:16>> = TopicName,
|
<<TopicId:16>> = TopicName,
|
||||||
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
|
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId, State);
|
||||||
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
|
do_publish(?SN_PREDEFINED_TOPIC, TopicId, Data, Flags, MsgId,
|
||||||
State=#state{clientid = ClientId, registry = Registry}) ->
|
State=#state{registry = Registry}) ->
|
||||||
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
|
#mqtt_sn_flags{qos = QoS, dup = Dup, retain = Retain} = Flags,
|
||||||
NewQoS = get_corrected_qos(QoS),
|
NewQoS = get_corrected_qos(QoS),
|
||||||
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
|
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
|
||||||
undefined ->
|
undefined ->
|
||||||
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State),
|
(NewQoS =/= ?QOS_0) andalso send_message(?SN_PUBACK_MSG(TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID), State),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
@ -979,12 +974,12 @@ do_publish_will(#state{will_msg = WillMsg, clientid = ClientId}) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
do_puback(TopicId, MsgId, ReturnCode, StateName,
|
do_puback(TopicId, MsgId, ReturnCode, StateName,
|
||||||
State=#state{clientid = ClientId, registry = Registry}) ->
|
State=#state{registry = Registry}) ->
|
||||||
case ReturnCode of
|
case ReturnCode of
|
||||||
?SN_RC_ACCEPTED ->
|
?SN_RC_ACCEPTED ->
|
||||||
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
|
handle_incoming(?PUBACK_PACKET(MsgId), StateName, State);
|
||||||
?SN_RC_INVALID_TOPIC_ID ->
|
?SN_RC_INVALID_TOPIC_ID ->
|
||||||
case emqx_sn_registry:lookup_topic(Registry, ClientId, TopicId) of
|
case emqx_sn_registry:lookup_topic(Registry, self(), TopicId) of
|
||||||
undefined -> ok;
|
undefined -> ok;
|
||||||
TopicName ->
|
TopicName ->
|
||||||
%%notice that this TopicName maybe normal or predefined,
|
%%notice that this TopicName maybe normal or predefined,
|
||||||
|
@ -1072,24 +1067,24 @@ handle_outgoing(Packets, State) when is_list(Packets) ->
|
||||||
lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets);
|
lists:foreach(fun(Packet) -> handle_outgoing(Packet, State) end, Packets);
|
||||||
|
|
||||||
handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
|
handle_outgoing(PubPkt = ?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload),
|
||||||
State = #state{clientid = ClientId, registry = Registry}) ->
|
State = #state{registry = Registry}) ->
|
||||||
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
|
||||||
MsgId = message_id(PacketId),
|
MsgId = message_id(PacketId),
|
||||||
?LOG(debug, "Handle outgoing: ~0p", [PubPkt]),
|
?LOG(debug, "Handle outgoing: ~0p", [PubPkt]),
|
||||||
|
|
||||||
(emqx_sn_registry:lookup_topic_id(Registry, ClientId, TopicName) == undefined)
|
(emqx_sn_registry:lookup_topic_id(Registry, self(), TopicName) == undefined)
|
||||||
andalso (byte_size(TopicName) =/= 2)
|
andalso (byte_size(TopicName) =/= 2)
|
||||||
andalso register_and_notify_client(TopicName, Payload, Dup, QoS,
|
andalso register_and_notify_client(TopicName, Payload, Dup, QoS,
|
||||||
Retain, MsgId, ClientId, State),
|
Retain, MsgId, State),
|
||||||
|
|
||||||
send_message(mqtt2sn(PubPkt, State), State);
|
send_message(mqtt2sn(PubPkt, State), State);
|
||||||
|
|
||||||
handle_outgoing(Packet, State) ->
|
handle_outgoing(Packet, State) ->
|
||||||
send_message(mqtt2sn(Packet, State), State).
|
send_message(mqtt2sn(Packet, State), State).
|
||||||
|
|
||||||
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId, ClientId,
|
register_and_notify_client(TopicName, Payload, Dup, QoS, Retain, MsgId,
|
||||||
State = #state{registry = Registry}) ->
|
State = #state{registry = Registry}) ->
|
||||||
TopicId = emqx_sn_registry:register_topic(Registry, ClientId, TopicName),
|
TopicId = emqx_sn_registry:register_topic(Registry, self(), TopicName),
|
||||||
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
|
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
|
||||||
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
|
||||||
send_register(TopicName, TopicId, MsgId, State).
|
send_register(TopicName, TopicId, MsgId, State).
|
||||||
|
|
|
@ -61,10 +61,10 @@ stop({_Tab, Pid}) ->
|
||||||
gen_server:stop(Pid, normal, infinity).
|
gen_server:stop(Pid, normal, infinity).
|
||||||
|
|
||||||
-spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}).
|
-spec(register_topic(registry(), binary(), binary()) -> integer() | {error, term()}).
|
||||||
register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
|
register_topic({_, Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
|
||||||
case emqx_topic:wildcard(TopicName) of
|
case emqx_topic:wildcard(TopicName) of
|
||||||
false ->
|
false ->
|
||||||
gen_server:call(Pid, {register, ClientId, TopicName});
|
gen_server:call(Pid, {register, ClientPid, TopicName});
|
||||||
%% TopicId: in case of “accepted” the value that will be used as topic
|
%% TopicId: in case of “accepted” the value that will be used as topic
|
||||||
%% id by the gateway when sending PUBLISH messages to the client (not
|
%% id by the gateway when sending PUBLISH messages to the client (not
|
||||||
%% relevant in case of subscriptions to a short topic name or to a topic
|
%% relevant in case of subscriptions to a short topic name or to a topic
|
||||||
|
@ -73,10 +73,10 @@ register_topic({_, Pid}, ClientId, TopicName) when is_binary(TopicName) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()).
|
-spec(lookup_topic(registry(), binary(), pos_integer()) -> undefined | binary()).
|
||||||
lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) ->
|
lookup_topic({Tab, _Pid}, ClientPid, TopicId) when is_integer(TopicId) ->
|
||||||
case lookup_element(Tab, {predef, TopicId}, 2) of
|
case lookup_element(Tab, {predef, TopicId}, 2) of
|
||||||
undefined ->
|
undefined ->
|
||||||
lookup_element(Tab, {ClientId, TopicId}, 2);
|
lookup_element(Tab, {ClientPid, TopicId}, 2);
|
||||||
Topic -> Topic
|
Topic -> Topic
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -84,10 +84,10 @@ lookup_topic({Tab, _Pid}, ClientId, TopicId) when is_integer(TopicId) ->
|
||||||
-> undefined
|
-> undefined
|
||||||
| pos_integer()
|
| pos_integer()
|
||||||
| {predef, integer()}).
|
| {predef, integer()}).
|
||||||
lookup_topic_id({Tab, _Pid}, ClientId, TopicName) when is_binary(TopicName) ->
|
lookup_topic_id({Tab, _Pid}, ClientPid, TopicName) when is_binary(TopicName) ->
|
||||||
case lookup_element(Tab, {predef, TopicName}, 2) of
|
case lookup_element(Tab, {predef, TopicName}, 2) of
|
||||||
undefined ->
|
undefined ->
|
||||||
lookup_element(Tab, {ClientId, TopicName}, 2);
|
lookup_element(Tab, {ClientPid, TopicName}, 2);
|
||||||
TopicId ->
|
TopicId ->
|
||||||
{predef, TopicId}
|
{predef, TopicId}
|
||||||
end.
|
end.
|
||||||
|
@ -97,16 +97,16 @@ lookup_element(Tab, Key, Pos) ->
|
||||||
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
|
try ets:lookup_element(Tab, Key, Pos) catch error:badarg -> undefined end.
|
||||||
|
|
||||||
-spec(unregister_topic(registry(), binary()) -> ok).
|
-spec(unregister_topic(registry(), binary()) -> ok).
|
||||||
unregister_topic({_Tab, Pid}, ClientId) ->
|
unregister_topic({_Tab, Pid}, ClientPid) ->
|
||||||
gen_server:call(Pid, {unregister, ClientId}).
|
gen_server:call(Pid, {unregister, ClientPid}).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Tab, PredefTopics]) ->
|
init([Tab, PredefTopics]) ->
|
||||||
%% {predef, TopicId} -> TopicName
|
%% {predef, TopicId} -> TopicName
|
||||||
%% {predef, TopicName} -> TopicId
|
%% {predef, TopicName} -> TopicId
|
||||||
%% {ClientId, TopicId} -> TopicName
|
%% {ClientPid, TopicId} -> TopicName
|
||||||
%% {ClientId, TopicName} -> TopicId
|
%% {ClientPid, TopicName} -> TopicId
|
||||||
_ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]),
|
_ = ets:new(Tab, [set, public, named_table, {read_concurrency, true}]),
|
||||||
MaxPredefId = lists:foldl(
|
MaxPredefId = lists:foldl(
|
||||||
fun({TopicId, TopicName}, AccId) ->
|
fun({TopicId, TopicName}, AccId) ->
|
||||||
|
@ -116,27 +116,27 @@ init([Tab, PredefTopics]) ->
|
||||||
end, 0, PredefTopics),
|
end, 0, PredefTopics),
|
||||||
{ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}.
|
{ok, #state{tab = Tab, max_predef_topic_id = MaxPredefId}}.
|
||||||
|
|
||||||
handle_call({register, ClientId, TopicName}, _From,
|
handle_call({register, ClientPid, TopicName}, _From,
|
||||||
State = #state{tab = Tab, max_predef_topic_id = PredefId}) ->
|
State = #state{tab = Tab, max_predef_topic_id = PredefId}) ->
|
||||||
case lookup_topic_id({Tab, self()}, ClientId, TopicName) of
|
case lookup_topic_id({Tab, self()}, ClientPid, TopicName) of
|
||||||
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
|
{predef, PredefTopicId} when is_integer(PredefTopicId) ->
|
||||||
{reply, PredefTopicId, State};
|
{reply, PredefTopicId, State};
|
||||||
TopicId when is_integer(TopicId) ->
|
TopicId when is_integer(TopicId) ->
|
||||||
{reply, TopicId, State};
|
{reply, TopicId, State};
|
||||||
undefined ->
|
undefined ->
|
||||||
case next_topic_id(Tab, PredefId, ClientId) of
|
case next_topic_id(Tab, PredefId, ClientPid) of
|
||||||
TopicId when TopicId >= 16#FFFF ->
|
TopicId when TopicId >= 16#FFFF ->
|
||||||
{reply, {error, too_large}, State};
|
{reply, {error, too_large}, State};
|
||||||
TopicId ->
|
TopicId ->
|
||||||
_ = ets:insert(Tab, {{ClientId, next_topic_id}, TopicId + 1}),
|
_ = ets:insert(Tab, {{ClientPid, next_topic_id}, TopicId + 1}),
|
||||||
_ = ets:insert(Tab, {{ClientId, TopicName}, TopicId}),
|
_ = ets:insert(Tab, {{ClientPid, TopicName}, TopicId}),
|
||||||
_ = ets:insert(Tab, {{ClientId, TopicId}, TopicName}),
|
_ = ets:insert(Tab, {{ClientPid, TopicId}, TopicName}),
|
||||||
{reply, TopicId, State}
|
{reply, TopicId, State}
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call({unregister, ClientId}, _From, State = #state{tab = Tab}) ->
|
handle_call({unregister, ClientPid}, _From, State = #state{tab = Tab}) ->
|
||||||
ets:match_delete(Tab, {{ClientId, '_'}, '_'}),
|
ets:match_delete(Tab, {{ClientPid, '_'}, '_'}),
|
||||||
{reply, ok, State};
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
@ -159,8 +159,8 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
next_topic_id(Tab, PredefId, ClientId) ->
|
next_topic_id(Tab, PredefId, ClientPid) ->
|
||||||
case ets:lookup(Tab, {ClientId, next_topic_id}) of
|
case ets:lookup(Tab, {ClientPid, next_topic_id}) of
|
||||||
[{_, Id}] -> Id;
|
[{_, Id}] -> Id;
|
||||||
[] -> PredefId + 1
|
[] -> PredefId + 1
|
||||||
end.
|
end.
|
||||||
|
|
Loading…
Reference in New Issue