rewrite pubsub
This commit is contained in:
parent
a72fccf28d
commit
21d456fd1f
|
@ -81,7 +81,7 @@ init([Node, SubTopic, Options]) ->
|
|||
true ->
|
||||
true = erlang:monitor_node(Node, true),
|
||||
State = parse_opts(Options, #state{node = Node, subtopic = SubTopic}),
|
||||
emqttd_pubsub:subscribe({SubTopic, ?QOS_0}, self()),
|
||||
emqttd_pubsub:subscribe({SubTopic, ?QOS_0}),
|
||||
{ok, State};
|
||||
false ->
|
||||
{stop, {cannot_connect, Node}}
|
||||
|
|
|
@ -150,8 +150,8 @@ init([Options]) ->
|
|||
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB,
|
||||
[ets:insert(?BROKER_TAB, {Topic, 0}) || Topic <- Topics],
|
||||
% Create $SYS Topics
|
||||
[{atomic, _} = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
|
||||
[{atomic, _} = create(systop(Topic)) || Topic <- Topics],
|
||||
[ok = create(systop(Topic)) || Topic <- ?SYSTOP_BROKERS],
|
||||
[ok = create(systop(Topic)) || Topic <- Topics],
|
||||
SysInterval = proplists:get_value(sys_interval, Options, 60),
|
||||
State = #state{started_at = os:timestamp(), sys_interval = SysInterval},
|
||||
Delay = if
|
||||
|
|
|
@ -182,7 +182,7 @@ init([Options]) ->
|
|||
% Init metrics
|
||||
[new_metric(Metric) || Metric <- Metrics],
|
||||
% $SYS Topics for metrics
|
||||
[{atomic, _} = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
|
||||
[ok = emqttd_pubsub:create(systop(Topic)) || {_, Topic} <- Metrics],
|
||||
PubInterval = proplists:get_value(pub_interval, Options, 60),
|
||||
Delay = if
|
||||
PubInterval == 0 -> 0;
|
||||
|
|
|
@ -109,7 +109,7 @@ topics() ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec create(binary()) -> {atomic, Reason :: any()} | {aborted, Reason :: any()}.
|
||||
create(Topic) when is_binary(Topic) ->
|
||||
mnesia:transaction(fun trie_add/1, [Topic]).
|
||||
{atomic, ok} = mnesia:transaction(fun trie_add/1, [Topic]), ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -128,21 +128,13 @@ subscribe(Topics = [{_Topic, _Qos}|_]) ->
|
|||
|
||||
subscribe([], _SubPid, Acc) ->
|
||||
{ok, lists:reverse(Acc)};
|
||||
%%TODO: check this function later.
|
||||
subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
||||
Subscriber = #topic_subscriber{topic=Topic, qos = Qos, subpid=SubPid},
|
||||
F = fun() ->
|
||||
case trie_add(Topic) of
|
||||
ok ->
|
||||
mnesia:write(Subscriber);
|
||||
{atomic, already_exist} ->
|
||||
mnesia:write(Subscriber);
|
||||
{aborted, Reason} ->
|
||||
{aborted, Reason}
|
||||
end
|
||||
end,
|
||||
F = fun() -> trie_add(Topic), mnesia:write(Subscriber) end,
|
||||
case mnesia:transaction(F) of
|
||||
ok -> subscribe(Topics, SubPid, [Qos|Acc]);
|
||||
{aborted, Reason} -> {error, {aborted, Reason}}
|
||||
{atomic, ok} -> subscribe(Topics, SubPid, [Qos|Acc]);
|
||||
Error -> {error, Error}
|
||||
end.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
|
@ -153,14 +145,25 @@ subscribe([{Topic, Qos}|Topics], SubPid, Acc) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec unsubscribe(binary() | list(binary())) -> ok.
|
||||
unsubscribe(Topic) when is_binary(Topic) ->
|
||||
%% call mnesia directly
|
||||
unsubscribe([Topic]);
|
||||
|
||||
unsubscribe(Topics = [Topics|_]) when is_list(Topics) and is_binary(Topic) ->
|
||||
unsubscribe(Topics = [Topic|_]) when is_list(Topics) and is_binary(Topic) ->
|
||||
unsubscribe(Topics, self()).
|
||||
|
||||
%%TODO: check this function later.
|
||||
unsubscribe(Topics, SubPid) ->
|
||||
|
||||
F = fun() ->
|
||||
Subscribers = mnesia:index_read(topic_subscriber, SubPid, #topic_subscriber.subpid),
|
||||
lists:foreach(fun(Sub = #topic_subscriber{topic = Topic}) ->
|
||||
case lists:member(Topic, Topics) of
|
||||
true -> mneisa:delete_object(Sub);
|
||||
false -> ok
|
||||
end
|
||||
end, Subscribers)
|
||||
%TODO: try to remove topic??? if topic is dynamic...
|
||||
%%try_remove_topic(Topic)
|
||||
end,
|
||||
{atomic, _} = mneisa:transaction(F), ok.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% @doc
|
||||
|
@ -191,7 +194,7 @@ publish(Topic, Msg) when is_binary(Topic) ->
|
|||
%%------------------------------------------------------------------------------
|
||||
-spec dispatch(Topic :: binary(), Msg :: mqtt_message()) -> non_neg_integer().
|
||||
dispatch(Topic, Msg = #mqtt_message{qos = Qos}) when is_binary(Topic) ->
|
||||
Subscribers = ets:lookup(topic_subscriber, Topic),
|
||||
Subscribers = mnesia:dirty_read(topic_subscriber, Topic),
|
||||
lists:foreach(
|
||||
fun(#topic_subscriber{qos = SubQos, subpid=SubPid}) ->
|
||||
Msg1 = if
|
||||
|
@ -235,7 +238,7 @@ init([]) ->
|
|||
{ram_copies, [node()]},
|
||||
{attributes, record_info(fields, topic)}]),
|
||||
mnesia:add_table_copy(topic, node(), ram_copies),
|
||||
|
||||
mnesia:subscribe({table, topic, simple}),
|
||||
%% local table, not shared with other table
|
||||
mnesia:create_table(topic_subscriber, [
|
||||
{type, bag},
|
||||
|
@ -244,6 +247,7 @@ init([]) ->
|
|||
{attributes, record_info(fields, topic_subscriber)},
|
||||
{index, [subpid]},
|
||||
{local_content, true}]),
|
||||
mnesia:subscribe({table, topic_subscriber, simple}),
|
||||
{ok, #state{}}.
|
||||
|
||||
handle_call(getstats, _From, State = #state{max_subs = Max}) ->
|
||||
|
@ -253,35 +257,45 @@ handle_call(getstats, _From, State = #state{max_subs = Max}) ->
|
|||
{reply, Stats, State};
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
{stop, {badreq, Req}, State}.
|
||||
|
||||
handle_cast({unsubscribe, Topics, SubPid}, State) ->
|
||||
lists:foreach(fun(Topic) ->
|
||||
ets:match_delete(topic_subscriber, #topic_subscriber{topic=Topic, qos ='_', subpid=SubPid}),
|
||||
try_remove_topic(Topic)
|
||||
end, Topics),
|
||||
{noreply, setstats(State)};
|
||||
lager:error("Bad Req: ~p", [Req]),
|
||||
{reply, error, State}.
|
||||
|
||||
handle_cast(Msg, State) ->
|
||||
{stop, {badmsg, Msg}, State}.
|
||||
lager:error("Bad Msg: ~p", [Msg]),
|
||||
{noreply, State}.
|
||||
|
||||
handle_info({'DOWN', Mon, _Type, _Object, _Info}, State) ->
|
||||
case get({submon, Mon}) of
|
||||
undefined ->
|
||||
lager:error("unexpected 'DOWN': ~p", [Mon]);
|
||||
SubPid ->
|
||||
erase({submon, Mon}),
|
||||
erase({subscriber, SubPid}),
|
||||
Subs = ets:match_object(topic_subscriber, #topic_subscriber{subpid=SubPid, _='_'}),
|
||||
[ets:delete_object(topic_subscriber, Sub) || Sub <- Subs],
|
||||
[try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
||||
end,
|
||||
%% a new record has been written.
|
||||
handle_info({mnesia_table_event, {write, #topic_subscriber{subpid = Pid}, _ActivityId}}, State) ->
|
||||
erlang:monitor(process, Pid),
|
||||
{noreply, setstats(State)};
|
||||
|
||||
%% {write, #topic{}, _ActivityId}
|
||||
%% {delete_object, _OldRecord, _ActivityId}
|
||||
%% {delete, {Tab, Key}, ActivityId}
|
||||
handle_info({mnesia_table_event, _Event}, State) ->
|
||||
{noreply, setstats(State)};
|
||||
|
||||
handle_info({'DOWN', _Mon, _Type, DownPid, _Info}, State) ->
|
||||
F = fun() ->
|
||||
%%TODO: how to read with write lock?
|
||||
[mnesia:delete_object(Sub) || Sub <- mnesia:index_read(topic_subscriber, DownPid, #topic_subscriber.subpid)]
|
||||
%%TODO: try to remove dynamic topics without subscribers
|
||||
%% [try_remove_topic(Topic) || #topic_subscriber{topic=Topic} <- Subs]
|
||||
end,
|
||||
case catch mnesia:transaction(F) of
|
||||
{atomic, _} -> ok;
|
||||
{aborted, Reason} -> lager:error("Failed to delete 'DOWN' subscriber ~p: ~p", [DownPid, Reason])
|
||||
end,
|
||||
{noreply, setstats(State)};
|
||||
|
||||
handle_info(Info, State) ->
|
||||
{stop, {badinfo, Info}, State}.
|
||||
lager:error("Bad Info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
terminate(_Reason, _State) ->
|
||||
mnesia:unsubscribe({table, topic, simple}),
|
||||
mnesia:unsubscribe({table, topic_subscriber, simple}),
|
||||
%%TODO: clear topics belongs to this node???
|
||||
ok.
|
||||
|
||||
code_change(_OldVsn, State, _Extra) ->
|
||||
|
@ -291,21 +305,21 @@ code_change(_OldVsn, State, _Extra) ->
|
|||
%%% Internal functions
|
||||
%%%=============================================================================
|
||||
|
||||
try_remove_topic(Name) when is_binary(Name) ->
|
||||
case ets:member(topic_subscriber, Name) of
|
||||
false ->
|
||||
Topic = emqttd_topic:new(Name),
|
||||
Fun = fun() ->
|
||||
mnesia:delete_object(Topic),
|
||||
case mnesia:read(topic, Name) of
|
||||
[] -> trie_delete(Name);
|
||||
_ -> ignore
|
||||
end
|
||||
end,
|
||||
mnesia:transaction(Fun);
|
||||
true ->
|
||||
ok
|
||||
end.
|
||||
%%try_remove_topic(Name) when is_binary(Name) ->
|
||||
%% case ets:member(topic_subscriber, Name) of
|
||||
%% false ->
|
||||
%% Topic = emqttd_topic:new(Name),
|
||||
%% Fun = fun() ->
|
||||
%% mnesia:delete_object(Topic),
|
||||
%% case mnesia:read(topic, Name) of
|
||||
%% [] -> trie_delete(Name);
|
||||
%% _ -> ignore
|
||||
%% end
|
||||
%% end,
|
||||
%% mnesia:transaction(Fun);
|
||||
%% true ->
|
||||
%% ok
|
||||
%% end.
|
||||
|
||||
trie_add(Topic) when is_binary(Topic) ->
|
||||
mnesia:write(emqttd_topic:new(Topic)),
|
||||
|
@ -313,7 +327,7 @@ trie_add(Topic) when is_binary(Topic) ->
|
|||
[TrieNode=#topic_trie_node{topic=undefined}] ->
|
||||
mnesia:write(TrieNode#topic_trie_node{topic=Topic});
|
||||
[#topic_trie_node{topic=Topic}] ->
|
||||
{atomic, already_exist};
|
||||
ok;
|
||||
[] ->
|
||||
%add trie path
|
||||
[trie_add_path(Triple) || Triple <- emqttd_topic:triples(Topic)],
|
||||
|
@ -389,7 +403,7 @@ trie_delete_path([{NodeId, Word, _} | RestPath]) ->
|
|||
|
||||
setstats(State = #state{max_subs = Max}) ->
|
||||
emqttd_broker:setstat('topics/count', mnesia:table_info(topic, size)),
|
||||
SubCount = ets:info(topic_subscriber, size),
|
||||
SubCount = mnesia:table_info(topic_subscriber, size),
|
||||
emqttd_broker:setstat('subscribers/count', SubCount),
|
||||
if
|
||||
SubCount > Max ->
|
||||
|
|
|
@ -185,7 +185,7 @@ subscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, Top
|
|||
_ -> lager:warning("~s resubscribe ~p", [ClientId, Resubs])
|
||||
end,
|
||||
SubMap1 = lists:foldl(fun({Name, Qos}, Acc) -> maps:put(Name, Qos, Acc) end, SubMap, Topics),
|
||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics, self()),
|
||||
{ok, GrantedQos} = emqttd_pubsub:subscribe(Topics),
|
||||
%%TODO: should be gen_event and notification...
|
||||
emqttd_server:subscribe([ Name || {Name, _} <- Topics ], self()),
|
||||
{ok, SessState#session_state{submap = SubMap1}, GrantedQos};
|
||||
|
@ -208,7 +208,7 @@ unsubscribe(SessState = #session_state{client_id = ClientId, submap = SubMap}, T
|
|||
BadUnsubs -> lager:warning("~s should not unsubscribe ~p", [ClientId, BadUnsubs])
|
||||
end,
|
||||
%%unsubscribe from topic tree
|
||||
ok = emqttd_pubsub:unsubscribe(Topics, self()),
|
||||
ok = emqttd_pubsub:unsubscribe(Topics),
|
||||
SubMap1 = lists:foldl(fun(Topic, Acc) -> maps:remove(Topic, Acc) end, SubMap, Topics),
|
||||
{ok, SessState#session_state{submap = SubMap1}};
|
||||
|
||||
|
|
Loading…
Reference in New Issue