diff --git a/apps/emqttd/src/emqttd_mnesia.erl b/apps/emqttd/src/emqttd_mnesia.erl index b0b6807ff..890210292 100644 --- a/apps/emqttd/src/emqttd_mnesia.erl +++ b/apps/emqttd/src/emqttd_mnesia.erl @@ -32,6 +32,8 @@ -export([start/0, cluster/1]). +-export([create_table/2, copy_table/1]). + start() -> case init_schema() of ok -> @@ -109,10 +111,8 @@ create_table(Table, Attrs) -> %% @end %%------------------------------------------------------------------------------ copy_tables() -> - ok = copy_table(topic), - ok = copy_table(topic_trie), - ok = copy_table(topic_trie_node), - ok = copy_table(topic_subscriber), + ok = emqttd_trie:mnesia(create), + ok = emqttd_pubsub:mnesia(create), ok = copy_table(message_retained). copy_table(Table) -> @@ -177,6 +177,4 @@ wait_for_mnesia(stop) -> {error, mnesia_unexpectedly_starting} end. - - diff --git a/apps/emqttd/src/emqttd_pubsub.erl b/apps/emqttd/src/emqttd_pubsub.erl index 744cf007d..40bd5e509 100644 --- a/apps/emqttd/src/emqttd_pubsub.erl +++ b/apps/emqttd/src/emqttd_pubsub.erl @@ -48,7 +48,8 @@ -export([start_link/0]). -export([create/1, - subscribe/1, unsubscribe/1, + subscribe/1, subscribe/2, + unsubscribe/1, publish/1, publish/2, %local node dispatch/2, match/1]). @@ -236,14 +237,15 @@ handle_cast(Msg, State) -> handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State = #state{submap = SubMap}) -> + NewSubMap = case maps:is_key(Pid, SubMap) of - false -> - maps:put(Pid, erlang:monitor(process, Pid)); - true -> - ignore + false -> + maps:put(Pid, erlang:monitor(process, Pid), SubMap); + true -> + SubMap end, setstats(subscribers), - {noreply, State}; + {noreply, State#state{submap = NewSubMap}}; handle_info({mnesia_table_event, {write, #topic{}, _ActivityId}}, State) -> %%TODO: this is not right when clusterd. @@ -301,7 +303,7 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%%============================================================================= insert_topic(Topic = #topic{name = Name}) -> - case mnesia:wread(topic, Name) of + case mnesia:wread({topic, Name}) of [] -> ok = emqttd_trie:insert(Name), mnesia:write(Topic); @@ -316,15 +318,14 @@ insert_subscriber(Subscriber) -> mnesia:write(Subscriber). try_remove_topic(Topic = #topic{name = Name}) -> - %%TODO: is this ok in transaction? - case ets:member(topic_subscriber, Name) of - false -> + case mnesia:read({topic_subscriber, Name}) of + [] -> mnesia:delete_object(Topic), case mnesia:read(topic, Name) of [] -> emqttd_trie:delete(Name); _ -> ok end; - true -> + _ -> ok end. @@ -338,4 +339,3 @@ setstats(subscribers) -> setstats(dropped) -> emqttd_metrics:inc('messages/dropped'). - diff --git a/apps/emqttd/src/emqttd_trie.erl b/apps/emqttd/src/emqttd_trie.erl index f86768048..ee80d6f99 100644 --- a/apps/emqttd/src/emqttd_trie.erl +++ b/apps/emqttd/src/emqttd_trie.erl @@ -157,7 +157,7 @@ add_path({Node, Word, Child}) -> Edge = #trie_edge{node_id=Node, word=Word}, case mnesia:read(trie_node, Node) of [TrieNode = #trie_node{edge_count=Count}] -> - case mnesia:wread(trie, Edge) of + case mnesia:wread({trie, Edge}) of [] -> mnesia:write(TrieNode#trie_node{edge_count=Count+1}), mnesia:write(#trie{edge=Edge, node_id=Child});