fix pubsub

This commit is contained in:
Ery Lee 2015-04-14 15:35:41 +08:00
parent d311a058cc
commit 47f99c5cca
3 changed files with 17 additions and 19 deletions

View File

@ -32,6 +32,8 @@
-export([start/0, cluster/1]). -export([start/0, cluster/1]).
-export([create_table/2, copy_table/1]).
start() -> start() ->
case init_schema() of case init_schema() of
ok -> ok ->
@ -109,10 +111,8 @@ create_table(Table, Attrs) ->
%% @end %% @end
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
copy_tables() -> copy_tables() ->
ok = copy_table(topic), ok = emqttd_trie:mnesia(create),
ok = copy_table(topic_trie), ok = emqttd_pubsub:mnesia(create),
ok = copy_table(topic_trie_node),
ok = copy_table(topic_subscriber),
ok = copy_table(message_retained). ok = copy_table(message_retained).
copy_table(Table) -> copy_table(Table) ->
@ -177,6 +177,4 @@ wait_for_mnesia(stop) ->
{error, mnesia_unexpectedly_starting} {error, mnesia_unexpectedly_starting}
end. end.

View File

@ -48,7 +48,8 @@
-export([start_link/0]). -export([start_link/0]).
-export([create/1, -export([create/1,
subscribe/1, unsubscribe/1, subscribe/1, subscribe/2,
unsubscribe/1,
publish/1, publish/2, publish/1, publish/2,
%local node %local node
dispatch/2, match/1]). dispatch/2, match/1]).
@ -236,14 +237,15 @@ handle_cast(Msg, State) ->
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}},
State = #state{submap = SubMap}) -> State = #state{submap = SubMap}) ->
NewSubMap =
case maps:is_key(Pid, SubMap) of case maps:is_key(Pid, SubMap) of
false -> false ->
maps:put(Pid, erlang:monitor(process, Pid)); maps:put(Pid, erlang:monitor(process, Pid), SubMap);
true -> true ->
ignore SubMap
end, end,
setstats(subscribers), setstats(subscribers),
{noreply, State}; {noreply, State#state{submap = NewSubMap}};
handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) -> handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) ->
%%TODO: this is not right when clusterd. %%TODO: this is not right when clusterd.
@ -301,7 +303,7 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions %%% Internal functions
%%%============================================================================= %%%=============================================================================
insert_topic(Topic = #topic{name = Name}) -> insert_topic(Topic = #topic{name = Name}) ->
case mnesia:wread(topic, Name) of case mnesia:wread({topic, Name}) of
[] -> [] ->
ok = emqttd_trie:insert(Name), ok = emqttd_trie:insert(Name),
mnesia:write(Topic); mnesia:write(Topic);
@ -316,15 +318,14 @@ insert_subscriber(Subscriber) ->
mnesia:write(Subscriber). mnesia:write(Subscriber).
try_remove_topic(Topic = #topic{name = Name}) -> try_remove_topic(Topic = #topic{name = Name}) ->
%%TODO: is this ok in transaction? case mnesia:read({topic_subscriber, Name}) of
case ets:member(topic_subscriber, Name) of [] ->
false ->
mnesia:delete_object(Topic), mnesia:delete_object(Topic),
case mnesia:read(topic, Name) of case mnesia:read(topic, Name) of
[] -> emqttd_trie:delete(Name); [] -> emqttd_trie:delete(Name);
_ -> ok _ -> ok
end; end;
true -> _ ->
ok ok
end. end.
@ -338,4 +339,3 @@ setstats(subscribers) ->
setstats(dropped) -> setstats(dropped) ->
emqttd_metrics:inc('messages/dropped'). emqttd_metrics:inc('messages/dropped').

View File

@ -157,7 +157,7 @@ add_path({Node, Word, Child}) ->
Edge = #trie_edge{node_id=Node, word=Word}, Edge = #trie_edge{node_id=Node, word=Word},
case mnesia:read(trie_node, Node) of case mnesia:read(trie_node, Node) of
[TrieNode = #trie_node{edge_count=Count}] -> [TrieNode = #trie_node{edge_count=Count}] ->
case mnesia:wread(trie, Edge) of case mnesia:wread({trie, Edge}) of
[] -> [] ->
mnesia:write(TrieNode#trie_node{edge_count=Count+1}), mnesia:write(TrieNode#trie_node{edge_count=Count+1}),
mnesia:write(#trie{edge=Edge, node_id=Child}); mnesia:write(#trie{edge=Edge, node_id=Child});