Merge subscription sharding

This commit is contained in:
Feng Lee 2018-12-10 10:28:01 +08:00
commit faac09eac9
21 changed files with 542 additions and 495 deletions

View File

@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch
emqx_hooks emqx_batch emqx_sequence
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

View File

@ -26,7 +26,6 @@
%% PubSub management API
-export([topics/0, subscriptions/1, subscribers/1, subscribed/2]).
-export([get_subopts/2, set_subopts/3]).
%% Hooks API
-export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]).
@ -70,20 +69,18 @@ is_running(Node) ->
subscribe(Topic) ->
emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
subscribe(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid).
subscribe(Topic, SubOpts) when is_map(SubOpts) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(),
emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options);
subscribe(Topic, SubPid, Options) when is_pid(SubPid)->
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options).
-spec(subscribe(emqx_topic:topic() | string(),
emqx_types:subid() | pid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) ->
emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts).
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
publish(Msg) ->
emqx_broker:publish(Msg).
@ -91,26 +88,14 @@ publish(Msg) ->
unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId);
unsubscribe(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid()) -> ok).
unsubscribe(Topic, SubId) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId).
%%------------------------------------------------------------------------------
%% PubSub management API
%%------------------------------------------------------------------------------
-spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
-> emqx_types:subopts()).
get_subopts(Topic, Subscriber) ->
emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber).
-spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(),
emqx_types:subopts()) -> boolean()).
set_subopts(Topic, Subscriber, Options) when is_map(Options) ->
emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options).
-spec(topics() -> list(emqx_topic:topic())).
topics() -> emqx_router:topics().
@ -118,15 +103,15 @@ topics() -> emqx_router:topics().
subscribers(Topic) ->
emqx_broker:subscribers(iolist_to_binary(Topic)).
-spec(subscriptions(emqx_types:subscriber()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(Subscriber) ->
emqx_broker:subscriptions(Subscriber).
-spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(SubPid) when is_pid(SubPid) ->
emqx_broker:subscriptions(SubPid).
-spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()).
subscribed(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:subscribed(iolist_to_binary(Topic), SubPid);
subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:subscribed(iolist_to_binary(Topic), SubId).
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()).
subscribed(SubPid, Topic) when is_pid(SubPid) ->
emqx_broker:subscribed(SubPid, iolist_to_binary(Topic));
subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) ->
emqx_broker:subscribed(SubId, iolist_to_binary(Topic)).
%%------------------------------------------------------------------------------
%% Hooks API

View File

@ -148,7 +148,7 @@ stop() ->
%%-----------------------------------------------------------------------------
init([]) ->
_ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
{ok, #{}}.
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->

View File

@ -45,7 +45,7 @@ all_rules() ->
-spec(init([File :: string()]) -> {ok, #{}}).
init([File]) ->
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
ok = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
true = load_rules_from_file(File),
{ok, #{acl_file => File}}.

View File

@ -19,15 +19,16 @@
-include("emqx.hrl").
-export([start_link/2]).
-export([subscribe/1, subscribe/2, subscribe/3, subscribe/4]).
-export([multi_subscribe/1, multi_subscribe/2, multi_subscribe/3]).
-export([subscribe/1, subscribe/2, subscribe/3]).
-export([unsubscribe/1, unsubscribe/2]).
-export([subscriber_down/1]).
-export([publish/1, safe_publish/1]).
-export([unsubscribe/1, unsubscribe/2, unsubscribe/3]).
-export([multi_unsubscribe/1, multi_unsubscribe/2, multi_unsubscribe/3]).
-export([dispatch/2, dispatch/3]).
-export([dispatch/2]).
-export([subscriptions/1, subscribers/1, subscribed/2]).
-export([get_subopts/2, set_subopts/3]).
-export([get_subopts/2, set_subopts/2]).
-export([topics/0]).
%% Stats fun
-export([stats_fun/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -38,131 +39,142 @@
-compile(nowarn_export_all).
-endif.
-record(state, {pool, id, submap, submon}).
-record(subscribe, {topic, subpid, subid, subopts = #{}}).
-record(unsubscribe, {topic, subpid, subid}).
%% The default request timeout
-define(TIMEOUT, 60000).
-define(BROKER, ?MODULE).
%% ETS tables
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBID, emqx_subid).
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBSCRIPTION, emqx_subscription).
%% Guards
-define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))).
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE,
[Pool, Id], [{hibernate_after, 1000}]).
_ = create_tabs(),
Name = emqx_misc:proc_name(?BROKER, Id),
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []).
%%------------------------------------------------------------------------------
%% Subscribe
%% Create tabs
%%------------------------------------------------------------------------------
-spec(create_tabs() -> ok).
create_tabs() ->
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
%% SubId: SubId -> SubPid
ok = emqx_tables:new(?SUBID, [set | TabOpts]),
%% SubOption: {SubPid, Topic} -> SubOption
ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
%% duplicate_bag: o(1) insert
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
%% duplicate_bag: o(1) insert
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
%%------------------------------------------------------------------------------
%% Subscribe API
%%------------------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic()) -> ok).
subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, self()).
subscribe(Topic, undefined).
-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
subscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
subscribe(Topic, SubPid, undefined);
-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
subscribe(Topic, self(), SubId).
subscribe(Topic, SubId, #{});
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
subscribe(Topic, undefined, SubOpts).
-spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(),
emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
subscribe(Topic, SubPid, SubId, #{qos => 0});
subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) ->
subscribe(Topic, SubPid, undefined, SubOpts);
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
subscribe(Topic, self(), SubId, SubOpts).
-spec(subscribe(emqx_topic:topic(), pid(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubPid, SubId, SubOpts) when is_binary(Topic), is_pid(SubPid),
?is_subid(SubId), is_map(SubOpts) ->
Broker = pick(SubPid),
SubReq = #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts},
wait_for_reply(async_call(Broker, SubReq), ?TIMEOUT).
-spec(multi_subscribe(emqx_types:topic_table()) -> ok).
multi_subscribe(TopicTable) when is_list(TopicTable) ->
multi_subscribe(TopicTable, self()).
-spec(multi_subscribe(emqx_types:topic_table(), pid() | emqx_types:subid()) -> ok).
multi_subscribe(TopicTable, SubPid) when is_pid(SubPid) ->
multi_subscribe(TopicTable, SubPid, undefined);
multi_subscribe(TopicTable, SubId) when ?is_subid(SubId) ->
multi_subscribe(TopicTable, self(), SubId).
-spec(multi_subscribe(emqx_types:topic_table(), pid(), emqx_types:subid()) -> ok).
multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
SubReq = fun(Topic, SubOpts) ->
#subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}
end,
wait_for_replies([async_call(Broker, SubReq(Topic, SubOpts))
|| {Topic, SubOpts} <- TopicTable], ?TIMEOUT).
SubPid = self(),
case ets:member(?SUBOPTION, {SubPid, Topic}) of
false ->
ok = emqx_broker_helper:monitor(SubPid, SubId),
%% true = ets:insert(?SUBID, {SubId, SubPid}),
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
case maps:get(share, SubOpts, undefined) of
undefined ->
Shard = emqx_broker_helper:get_shard(SubPid, Topic),
case Shard of
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
I ->
true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}})
end,
SubOpts1 = maps:put(shard, Shard, SubOpts),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
call(pick({Topic, Shard}), {subscribe, Topic});
Group -> %% Shard subscription
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
emqx_shard_sub:subscribe(Group, Topic, SubPid)
end;
true -> ok
end.
%%------------------------------------------------------------------------------
%% Unsubscribe
%% Unsubscribe API
%%------------------------------------------------------------------------------
-spec(unsubscribe(emqx_topic:topic()) -> ok).
unsubscribe(Topic) when is_binary(Topic) ->
unsubscribe(Topic, self()).
SubPid = self(),
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
[{_, SubOpts}] ->
_ = emqx_broker_helper:reclaim_seq(Topic),
case maps:get(share, SubOpts, undefined) of
undefined ->
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = cast(pick(Topic), {unsubscribed, Topic});
I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
case ets:member(emqx_subscriber, {shard, Topic, I}) of
true -> ok;
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
end,
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end;
Group ->
ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid)
end,
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
%%true = ets:delete_object(?SUBID, {SubId, SubPid}),
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
ok;
[] -> ok
end.
-spec(unsubscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok).
unsubscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
unsubscribe(Topic, SubPid, undefined);
unsubscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
unsubscribe(Topic, self(), SubId).
-spec(unsubscribe(emqx_topic:topic(), pid(), emqx_types:subid()) -> ok).
unsubscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
UnsubReq = #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId},
wait_for_reply(async_call(Broker, UnsubReq), ?TIMEOUT).
-spec(multi_unsubscribe([emqx_topic:topic()]) -> ok).
multi_unsubscribe(Topics) ->
multi_unsubscribe(Topics, self()).
-spec(multi_unsubscribe([emqx_topic:topic()], pid() | emqx_types:subid()) -> ok).
multi_unsubscribe(Topics, SubPid) when is_pid(SubPid) ->
multi_unsubscribe(Topics, SubPid, undefined);
multi_unsubscribe(Topics, SubId) when ?is_subid(SubId) ->
multi_unsubscribe(Topics, self(), SubId).
-spec(multi_unsubscribe([emqx_topic:topic()], pid(), emqx_types:subid()) -> ok).
multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) ->
Broker = pick(SubPid),
UnsubReq = fun(Topic) ->
#unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}
end,
wait_for_replies([async_call(Broker, UnsubReq(Topic)) || Topic <- Topics], ?TIMEOUT).
-spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok).
unsubscribe(Topic, _SubId) when is_binary(Topic) ->
unsubscribe(Topic).
%%------------------------------------------------------------------------------
%% Publish
%%------------------------------------------------------------------------------
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_tracer:trace(publish, Msg),
{ok, case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results;
{stop, _} ->
emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
end}.
case emqx_hooks:run('message.publish', [], Msg) of
{ok, Msg1 = #message{topic = Topic}} ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results;
{stop, _} ->
emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
end.
-spec(safe_publish(emqx_types:message()) -> ok).
%% Called internally
-spec(safe_publish(emqx_types:message()) -> ok).
safe_publish(Msg) when is_record(Msg, message) ->
try
publish(Msg)
@ -230,134 +242,163 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
[Sub] -> %% optimize?
dispatch(Sub, Topic, Msg),
Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
Subscribers ->
Count = lists:foldl(fun(Sub, Acc) ->
dispatch(Sub, Topic, Msg), Acc + 1
end, 0, Subscribers),
Subs ->
Count = lists:foldl(
fun(Sub, Acc) ->
dispatch(Sub, Topic, Msg), Acc + 1
end, 0, Subs),
Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
end.
dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) ->
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
SubPid ! {dispatch, Topic, Msg};
dispatch({share, _Group, _Sub}, _Topic, _Msg) ->
ignored.
dispatch({shard, I}, Topic, Msg) ->
lists:foreach(fun(SubPid) ->
SubPid ! {dispatch, Topic, Msg}
end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])).
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
ok;
inc_dropped_cnt(_Topic) ->
emqx_metrics:inc('messages/dropped').
-spec(subscribers(emqx_topic:topic()) -> [emqx_types:subscriber()]).
-spec(subscribers(emqx_topic:topic()) -> [pid()]).
subscribers(Topic) ->
try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end.
safe_lookup_element(?SUBSCRIBER, Topic, []).
-spec(subscriptions(emqx_types:subscriber())
%%------------------------------------------------------------------------------
%% Subscriber is down
%%------------------------------------------------------------------------------
-spec(subscriber_down(pid()) -> true).
subscriber_down(SubPid) ->
lists:foreach(
fun(Sub = {_Pid, Topic}) ->
case ets:lookup(?SUBOPTION, Sub) of
[{_, SubOpts}] ->
_ = emqx_broker_helper:reclaim_seq(Topic),
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = cast(pick(Topic), {unsubscribed, Topic});
I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
case ets:member(emqx_subscriber, {shard, Topic, I}) of
true -> ok;
false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}})
end,
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end,
ets:delete(?SUBOPTION, Sub);
[] -> ok
end
end, ets:lookup(?SUBSCRIPTION, SubPid)),
true = ets:delete(?SUBSCRIPTION, SubPid).
%%------------------------------------------------------------------------------
%% Management APIs
%%------------------------------------------------------------------------------
-spec(subscriptions(pid() | emqx_types:subid())
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(Subscriber) ->
lists:map(fun({_, {share, _Group, Topic}}) ->
subscription(Topic, Subscriber);
({_, Topic}) ->
subscription(Topic, Subscriber)
end, ets:lookup(?SUBSCRIPTION, Subscriber)).
subscriptions(SubPid) ->
[{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})}
|| Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])].
subscription(Topic, Subscriber) ->
{Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}.
-spec(subscribed(pid(), emqx_topic:topic()) -> boolean()).
subscribed(SubPid, Topic) when is_pid(SubPid) ->
ets:member(?SUBOPTION, {SubPid, Topic});
subscribed(SubId, Topic) when ?is_subid(SubId) ->
%%FIXME:... SubId -> SubPid
ets:member(?SUBOPTION, {SubId, Topic}).
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of
{Match, _} ->
length(Match) >= 1;
'$end_of_table' ->
false
end;
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of
{Match, _} ->
length(Match) >= 1;
'$end_of_table' ->
false
end;
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()).
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}).
-spec(get_subopts(emqx_topic:topic(), emqx_types:subscriber()) -> emqx_types:subopts()).
get_subopts(Topic, Subscriber) when is_binary(Topic) ->
try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)
catch error:badarg -> []
end.
-spec(set_subopts(emqx_topic:topic(), emqx_types:subscriber(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_map(Opts) ->
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
Sub = {self(), Topic},
case ets:lookup(?SUBOPTION, Sub) of
[{_, OldOpts}] ->
ets:insert(?SUBOPTION, {{Topic, Subscriber}, maps:merge(OldOpts, Opts)});
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});
[] -> false
end.
async_call(Broker, Req) ->
From = {self(), Tag = make_ref()},
ok = gen_server:cast(Broker, {From, Req}),
Tag.
-spec(topics() -> [emqx_topic:topic()]).
topics() ->
emqx_router:topics().
wait_for_replies(Tags, Timeout) ->
lists:foreach(
fun(Tag) ->
wait_for_reply(Tag, Timeout)
end, Tags).
safe_lookup_element(Tab, Key, Def) ->
try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end.
wait_for_reply(Tag, Timeout) ->
receive
{Tag, Reply} -> Reply
after Timeout ->
exit(timeout)
%%------------------------------------------------------------------------------
%% Stats fun
%%------------------------------------------------------------------------------
stats_fun() ->
safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'),
safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'),
safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max').
safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of
undefined -> ok;
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
%% Pick a broker
pick(SubPid) when is_pid(SubPid) ->
gproc_pool:pick_worker(broker, SubPid).
%%------------------------------------------------------------------------------
%% call, cast, pick
%%------------------------------------------------------------------------------
-spec(topics() -> [emqx_topic:topic()]).
topics() -> emqx_router:topics().
call(Broker, Req) ->
gen_server:call(Broker, Req).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).
%% Pick a broker
pick(Topic) ->
gproc_pool:pick_worker(broker, Topic).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([Pool, Id]) ->
_ = emqx_router:set_mode(protected),
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}.
{ok, #{pool => Pool, id => Id}}.
handle_call({subscribe, Topic}, _From, State) ->
case get(Topic) of
undefined ->
_ = put(Topic, true),
emqx_router:add_route(Topic);
true -> ok
end,
{reply, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) ->
Subscriber = {SubPid, SubId},
case ets:member(?SUBOPTION, {Topic, Subscriber}) of
handle_cast({unsubscribed, Topic}, State) ->
case ets:member(?SUBSCRIBER, Topic) of
false ->
Group = maps:get(share, SubOpts, undefined),
true = do_subscribe(Group, Topic, Subscriber, SubOpts),
emqx_shared_sub:subscribe(Group, Topic, SubPid),
emqx_router:add_route(From, Topic, dest(Group)),
{noreply, monitor_subscriber(Subscriber, State)};
true ->
gen_server:reply(From, ok),
{noreply, State}
end;
_ = erase(Topic),
emqx_router:delete_route(Topic);
true -> ok
end,
{noreply, State};
handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) ->
Subscriber = {SubPid, SubId},
case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of
[{_, SubOpts}] ->
Group = maps:get(share, SubOpts, undefined),
true = do_unsubscribe(Group, Topic, Subscriber),
emqx_shared_sub:unsubscribe(Group, Topic, SubPid),
case ets:member(?SUBSCRIBER, Topic) of
false -> emqx_router:del_route(From, Topic, dest(Group));
true -> gen_server:reply(From, ok)
end;
[] -> gen_server:reply(From, ok)
handle_cast({unsubscribed, Topic, I}, State) ->
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
false ->
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {unsubscribed, Topic});
true -> ok
end,
{noreply, State};
@ -365,21 +406,11 @@ handle_cast(Msg, State) ->
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = SubMap}) ->
case maps:find(SubPid, SubMap) of
{ok, SubIds} ->
lists:foreach(fun(SubId) -> subscriber_down({SubPid, SubId}) end, SubIds),
{noreply, demonitor_subscriber(SubPid, State)};
error ->
emqx_logger:error("unexpected 'DOWN': ~p, reason: ~p", [SubPid, Reason]),
{noreply, State}
end;
handle_info(Info, State) ->
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
@ -389,52 +420,3 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
do_subscribe(Group, Topic, Subscriber, SubOpts) ->
ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}).
do_unsubscribe(Group, Topic, Subscriber) ->
ets:delete_object(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}),
ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}),
ets:delete(?SUBOPTION, {Topic, Subscriber}).
subscriber_down(Subscriber) ->
Topics = lists:map(fun({_, {share, Group, Topic}}) ->
{Topic, Group};
({_, Topic}) ->
{Topic, undefined}
end, ets:lookup(?SUBSCRIPTION, Subscriber)),
lists:foreach(fun({Topic, undefined}) ->
true = do_unsubscribe(undefined, Topic, Subscriber),
ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined));
({Topic, Group}) ->
true = do_unsubscribe(Group, Topic, Subscriber),
Groups = groups(Topic),
case lists:member(Group, lists:usort(Groups)) of
true -> ok;
false -> emqx_router:del_route(Topic, dest(Group))
end
end, Topics).
monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) ->
UpFun = fun(SubIds) -> lists:usort([SubId|SubIds]) end,
State#state{submap = maps:update_with(SubPid, UpFun, [SubId], SubMap),
submon = emqx_pmon:monitor(SubPid, SubMon)}.
demonitor_subscriber(SubPid, State = #state{submap = SubMap, submon = SubMon}) ->
State#state{submap = maps:remove(SubPid, SubMap),
submon = emqx_pmon:demonitor(SubPid, SubMon)}.
dest(undefined) -> node();
dest(Group) -> {Group, node()}.
shared(undefined, Name) -> Name;
shared(Group, Name) -> {share, Group, Name}.
groups(Topic) ->
lists:foldl(fun({_, {share, Group, _}}, Acc) ->
[Group | Acc];
({_, _}, Acc) ->
Acc
end, [], ets:lookup(?SUBSCRIBER, Topic)).

View File

@ -16,63 +16,92 @@
-behaviour(gen_server).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-compile({no_auto_import, [monitor/2]}).
%% internal export
-export([stats_fun/0]).
-export([start_link/0]).
-export([monitor/2]).
-export([get_shard/2]).
-export([create_seq/1, reclaim_seq/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(HELPER, ?MODULE).
-define(SUBMON, emqx_submon).
-define(SUBSEQ, emqx_subseq).
-record(state, {}).
-record(state, {pmon :: emqx_pmon:pmon()}).
-spec(start_link() -> {ok, pid()} | ignore | {error, any()}).
-spec(start_link() -> emqx_types:startlink_ret()).
start_link() ->
gen_server:start_link({local, ?HELPER}, ?MODULE, [], []).
-spec(monitor(pid(), emqx_types:subid()) -> ok).
monitor(SubPid, SubId) when is_pid(SubPid) ->
case ets:lookup(?SUBMON, SubPid) of
[] ->
gen_server:cast(?HELPER, {monitor, SubPid, SubId});
[{_, SubId}] ->
ok;
_Other ->
error(subid_conflict)
end.
-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()).
get_shard(SubPid, Topic) ->
case create_seq(Topic) of
Seq when Seq =< 1024 -> 0;
_Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2))
end.
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
create_seq(Topic) ->
emqx_sequence:nextval(?SUBSEQ, Topic).
-spec(reclaim_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
reclaim_seq(Topic) ->
emqx_sequence:reclaim(?SUBSEQ, Topic).
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%------------------------------------------------------------------------------
init([]) ->
%% Use M:F/A for callback, not anonymous function because
%% fun M:F/A is small, also no badfun risk during hot beam reload
emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0),
{ok, #state{}, hibernate}.
%% SubSeq: Topic -> SeqId
ok = emqx_sequence:create(?SUBSEQ),
%% Shards: CPU * 32
true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}),
%% SubMon: SubPid -> SubId
ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
%% Stats timer
emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
{ok, #state{pmon = emqx_pmon:new()}, hibernate}.
handle_call(Req, _From, State) ->
emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({monitor, SubPid, SubId}, State = #state{pmon = PMon}) ->
true = ets:insert(?SUBMON, {SubPid, SubId}),
{noreply, State#state{pmon = emqx_pmon:monitor(SubPid, PMon)}};
handle_cast(Msg, State) ->
emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
true = ets:delete(?SUBMON, SubPid),
ok = emqx_pool:async_submit(fun emqx_broker:subscriber_down/1, [SubPid]),
{noreply, State#state{pmon = emqx_pmon:erase(SubPid, PMon)}};
handle_info(Info, State) ->
emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{}) ->
_ = emqx_sequence:delete(?SUBSEQ),
emqx_stats:cancel_update(broker_stats).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
stats_fun() ->
safe_update_stats(emqx_subscriber,
'subscribers/count', 'subscribers/max'),
safe_update_stats(emqx_subscription,
'subscriptions/count', 'subscriptions/max'),
safe_update_stats(emqx_suboptions,
'suboptions/count', 'suboptions/max').
safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of
undefined -> ok;
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.

View File

@ -20,8 +20,6 @@
-export([init/1]).
-define(TAB_OPTS, [public, {read_concurrency, true}, {write_concurrency, true}]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
@ -30,39 +28,26 @@ start_link() ->
%%------------------------------------------------------------------------------
init([]) ->
%% Create the pubsub tables
ok = lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]),
%% Broker pool
PoolSize = emqx_vm:schedulers() * 2,
BrokerPool = emqx_pool_sup:spec(emqx_broker_pool,
[broker, hash, PoolSize,
{emqx_broker, start_link, []}]),
%% Shared subscription
SharedSub = {shared_sub, {emqx_shared_sub, start_link, []},
permanent, 5000, worker, [emqx_shared_sub]},
SharedSub = #{id => shared_sub,
start => {emqx_shared_sub, start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [emqx_shared_sub]},
%% Broker helper
Helper = {broker_helper, {emqx_broker_helper, start_link, []},
permanent, 5000, worker, [emqx_broker_helper]},
Helper = #{id => helper,
start => {emqx_broker_helper, start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => [emqx_broker_helper]},
%% Broker pool
BrokerPool = emqx_pool_sup:spec(emqx_broker_pool,
[broker, hash, emqx_vm:schedulers() * 2,
{emqx_broker, start_link, []}]),
{ok, {{one_for_all, 0, 1}, [SharedSub, Helper, BrokerPool]}}.
%%------------------------------------------------------------------------------
%% Create tables
%%------------------------------------------------------------------------------
create_tab(suboption) ->
%% Suboption: {Topic, Sub} -> [{qos, 1}]
emqx_tables:new(emqx_suboption, [set | ?TAB_OPTS]);
create_tab(subscriber) ->
%% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN
%% duplicate_bag: o(1) insert
emqx_tables:new(emqx_subscriber, [duplicate_bag | ?TAB_OPTS]);
create_tab(subscription) ->
%% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN
%% bag: o(n) insert
emqx_tables:new(emqx_subscription, [bag | ?TAB_OPTS]).
{ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}.

View File

@ -125,9 +125,9 @@ notify(Msg) ->
init([]) ->
TabOpts = [public, set, {write_concurrency, true}],
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
{ok, #{conn_pmon => emqx_pmon:new()}}.

View File

@ -96,7 +96,7 @@ usage() ->
%%------------------------------------------------------------------------------
init([]) ->
_ = emqx_tables:new(?TAB, [ordered_set, protected]),
ok = emqx_tables:new(?TAB, [protected, ordered_set]),
{ok, #state{seq = 0}}.
handle_call(Req, _From, State) ->

View File

@ -139,7 +139,7 @@ lookup(HookPoint) ->
%%------------------------------------------------------------------------------
init([]) ->
_ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->

View File

@ -61,7 +61,7 @@ init([Pool, Id, Node, Topic, Options]) ->
true ->
true = erlang:monitor_node(Node, true),
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
emqx_broker:subscribe(Topic, #{share => Group, qos => ?QOS_0}),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len,
store_qos0 => true}),

View File

@ -285,7 +285,7 @@ qos_sent(?QOS_2) ->
init([]) ->
% Create metrics table
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
{ok, #{}, hibernate}.

View File

@ -28,23 +28,22 @@
-export([start_link/2]).
%% Route APIs
-export([add_route/1, add_route/2, add_route/3]).
-export([add_route/1, add_route/2]).
-export([get_routes/1]).
-export([del_route/1, del_route/2, del_route/3]).
-export([delete_route/1, delete_route/2]).
-export([has_routes/1, match_routes/1, print_routes/1]).
-export([topics/0]).
%% Mode
-export([set_mode/1, get_mode/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-type(destination() :: node() | {binary(), node()}).
-record(batch, {enabled, timer, pending}).
-record(state, {pool, id, batch :: #batch{}}).
-define(ROUTE, emqx_route).
-define(BATCH(Enabled), #batch{enabled = Enabled}).
-define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
@ -62,49 +61,66 @@ mnesia(copy) ->
ok = ekka_mnesia:copy_table(?ROUTE).
%%------------------------------------------------------------------------------
%% Strat a router
%% Start a router
%%------------------------------------------------------------------------------
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
start_link(Pool, Id) ->
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
Name = emqx_misc:proc_name(?MODULE, Id),
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
%%------------------------------------------------------------------------------
%% Route APIs
%%------------------------------------------------------------------------------
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok).
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
add_route(Topic) when is_binary(Topic) ->
add_route(#route{topic = Topic, dest = node()});
add_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {add_route, Route}).
case get_mode() of
protected -> do_add_route(Route);
undefined -> call(pick(Topic), {add_route, Route})
end.
-spec(add_route(emqx_topic:topic(), destination()) -> ok).
-spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
add_route(Topic, Dest) when is_binary(Topic) ->
add_route(#route{topic = Topic, dest = Dest}).
-spec(add_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
add_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}).
%% @private
do_add_route(Route = #route{topic = Topic, dest = Dest}) ->
case lists:member(Route, get_routes(Topic)) of
true -> ok;
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> trans(fun add_trie_route/1, [Route]);
false -> add_direct_route(Route)
end
end.
-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]).
get_routes(Topic) ->
ets:lookup(?ROUTE, Topic).
-spec(del_route(emqx_topic:topic() | emqx_types:route()) -> ok).
del_route(Topic) when is_binary(Topic) ->
del_route(#route{topic = Topic, dest = node()});
del_route(Route = #route{topic = Topic}) ->
cast(pick(Topic), {del_route, Route}).
-spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
delete_route(Topic) when is_binary(Topic) ->
delete_route(#route{topic = Topic, dest = node()});
delete_route(Route = #route{topic = Topic}) ->
case get_mode() of
protected -> do_delete_route(Route);
undefined -> call(pick(Topic), {delete_route, Route})
end.
-spec(del_route(emqx_topic:topic(), destination()) -> ok).
del_route(Topic, Dest) when is_binary(Topic) ->
del_route(#route{topic = Topic, dest = Dest}).
-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
delete_route(Topic, Dest) when is_binary(Topic) ->
delete_route(#route{topic = Topic, dest = Dest}).
-spec(del_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
del_route(From, Topic, Dest) when is_binary(Topic) ->
cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}).
%% @private
do_delete_route(Route = #route{topic = Topic}) ->
case emqx_topic:wildcard(Topic) of
true -> trans(fun del_trie_route/1, [Route]);
false -> del_direct_route(Route)
end.
-spec(has_routes(emqx_topic:topic()) -> boolean()).
has_routes(Topic) when is_binary(Topic) ->
@ -127,8 +143,15 @@ print_routes(Topic) ->
io:format("~s -> ~s~n", [To, Dest])
end, match_routes(Topic)).
cast(Router, Msg) ->
gen_server:cast(Router, Msg).
-spec(set_mode(protected | atom()) -> any()).
set_mode(Mode) when is_atom(Mode) ->
put('$router_mode', Mode).
-spec(get_mode() -> protected | undefined | atom()).
get_mode() -> get('$router_mode').
call(Router, Msg) ->
gen_server:call(Router, Msg, infinity).
pick(Topic) ->
gproc_pool:pick_worker(router, Topic).
@ -138,71 +161,28 @@ pick(Topic) ->
%%------------------------------------------------------------------------------
init([Pool, Id]) ->
rand:seed(exsplus, erlang:timestamp()),
gproc_pool:connect_worker(Pool, {Pool, Id}),
Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false),
pending = sets:new()},
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({add_route, Route}, _From, State) ->
{reply, do_add_route(Route), State};
handle_call({delete_route, Route}, _From, State) ->
{reply, do_delete_route(Route), State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({add_route, From, Route}, State) ->
{noreply, NewState} = handle_cast({add_route, Route}, State),
_ = gen_server:reply(From, ok),
{noreply, NewState};
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
case lists:member(Route, get_routes(Topic)) of
true -> ok;
false ->
ok = emqx_router_helper:monitor(Dest),
case emqx_topic:wildcard(Topic) of
true -> log(trans(fun add_trie_route/1, [Route]));
false -> add_direct_route(Route)
end
end,
{noreply, State};
handle_cast({del_route, From, Route}, State) ->
{noreply, NewState} = handle_cast({del_route, Route}, State),
_ = gen_server:reply(From, ok),
{noreply, NewState};
handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) ->
{noreply, case emqx_topic:wildcard(Topic) of
true -> log(trans(fun del_trie_route/1, [Route])),
State;
false -> del_direct_route(Route, State)
end};
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
%% Confirm if there are still subscribers...
{noreply, case ets:member(emqx_subscriber, Topic) of
true -> State;
false ->
case emqx_topic:wildcard(Topic) of
true -> log(trans(fun del_trie_route/1, [Route])),
State;
false -> del_direct_route(Route, State)
end
end};
handle_cast(Msg, State) ->
emqx_logger:error("[Router] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
_ = del_direct_routes(sets:to_list(Batch#batch.pending)),
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[Router] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) ->
_ = cacel_batch_timer(Batch),
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
@ -212,17 +192,6 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%------------------------------------------------------------------------------
ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
State;
ensure_batch_timer(State = #state{batch = Batch}) ->
TRef = erlang:start_timer(50 + rand:uniform(50), self(), batch_delete),
State#state{batch = Batch#batch{timer = TRef}}.
cacel_batch_timer(#batch{enabled = false}) ->
ok;
cacel_batch_timer(#batch{enabled = true, timer = TRef}) ->
catch erlang:cancel_timer(TRef).
add_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
@ -233,25 +202,9 @@ add_trie_route(Route = #route{topic = Topic}) ->
end,
mnesia:write(?ROUTE, Route, sticky_write).
del_direct_route(Route, State = #state{batch = ?BATCH(false)}) ->
del_direct_route(Route), State;
del_direct_route(Route, State = #state{batch = Batch = ?BATCH(true, Pending)}) ->
State#state{batch = Batch#batch{pending = sets:add_element(Route, Pending)}}.
del_direct_route(Route) ->
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
del_direct_routes([]) ->
ok;
del_direct_routes(Routes) ->
DelFun = fun(R = #route{topic = Topic}) ->
case ets:member(emqx_subscriber, Topic) of
true -> ok;
false -> mnesia:delete_object(?ROUTE, R, sticky_write)
end
end,
mnesia:async_dirty(fun lists:foreach/2, [DelFun, Routes]).
del_trie_route(Route = #route{topic = Topic}) ->
case mnesia:wread({?ROUTE, Topic}) of
[Route] -> %% Remove route and trie
@ -270,7 +223,3 @@ trans(Fun, Args) ->
{aborted, Error} -> {error, Error}
end.
log(ok) -> ok;
log({error, Reason}) ->
emqx_logger:error("[Router] mnesia aborted: ~p", [Reason]).

59
src/emqx_sequence.erl Normal file
View File

@ -0,0 +1,59 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sequence).
-export([create/1, nextval/2, currval/2, reclaim/2, delete/1]).
-type(key() :: term()).
-type(name() :: atom()).
-type(seqid() :: non_neg_integer()).
-export_type([seqid/0]).
%% @doc Create a sequence.
-spec(create(name()) -> ok).
create(Name) ->
emqx_tables:new(Name, [public, set, {write_concurrency, true}]).
%% @doc Next value of the sequence.
-spec(nextval(name(), key()) -> seqid()).
nextval(Name, Key) ->
ets:update_counter(Name, Key, {2, 1}, {Key, 0}).
%% @doc Current value of the sequence.
-spec(currval(name(), key()) -> seqid()).
currval(Name, Key) ->
try ets:lookup_element(Name, Key, 2)
catch
error:badarg -> 0
end.
%% @doc Reclaim a sequence id.
-spec(reclaim(name(), key()) -> seqid()).
reclaim(Name, Key) ->
try ets:update_counter(Name, Key, {2, -1, 0, 0}) of
0 -> ets:delete_object(Name, {Key, 0}), 0;
I -> I
catch
error:badarg -> 0
end.
%% @doc Delete the sequence.
delete(Name) ->
case ets:info(Name, name) of
Name -> ets:delete(Name);
undefined -> false
end.

View File

@ -465,7 +465,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -27,7 +28,8 @@
-export([start_link/0]).
-export([subscribe/3, unsubscribe/3]).
-export([dispatch/3, maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]).
-export([dispatch/3]).
-export([maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]).
%% for testing
-export([subscribers/2]).
@ -38,6 +40,7 @@
-define(SERVER, ?MODULE).
-define(TAB, emqx_shared_subscription).
-define(SHARED_SUBS, emqx_shared_subscriber).
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
-define(ack, shared_sub_ack).
@ -48,8 +51,6 @@
-record(state, {pmon}).
-record(emqx_shared_subscription, {group, topic, subpid}).
-include("emqx_mqtt.hrl").
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
@ -72,16 +73,11 @@ mnesia(copy) ->
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
subscribe(undefined, _Topic, _SubPid) ->
ok;
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
gen_server:cast(?SERVER, {monitor, SubPid}).
gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
unsubscribe(undefined, _Topic, _SubPid) ->
ok;
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)).
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
record(Group, Topic, SubPid) ->
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
@ -251,14 +247,16 @@ do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) ->
subscribers(Group, Topic) ->
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
init([]) ->
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
_ = emqx_router:set_mode(protected),
mnesia:subscribe({table, ?TAB, simple}),
ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]),
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
{ok, update_stats(#state{pmon = PMon})}.
init_monitors() ->
@ -267,14 +265,29 @@ init_monitors() ->
emqx_pmon:monitor(SubPid, Mon)
end, emqx_pmon:new(), ?TAB).
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok;
false -> ok = emqx_router:add_route(Topic, {Group, node()})
end,
ok = maybe_insert_alive_tab(SubPid),
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok;
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
end,
{reply, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
NewPmon = emqx_pmon:monitor(SubPid, PMon),
ok = maybe_insert_alive_tab(SubPid),
{noreply, update_stats(State#state{pmon = NewPmon})};
handle_cast(Msg, State) ->
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
{noreply, State}.
@ -316,12 +329,18 @@ maybe_insert_alive_tab(Pid) when is_pid(Pid) -> ets:insert(?ALIVE_SUBS, {Pid}),
cleanup_down(SubPid) ->
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
lists:foreach(
fun(Record) ->
mnesia:dirty_delete_object(?TAB, Record)
end,mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
ok = mnesia:dirty_delete_object(?TAB, Record),
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
case ets:member(?SHARED_SUBS, {Group, Topic}) of
true -> ok;
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
end
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
update_stats(State) ->
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)),
State.
%% Return 'true' if the subscriber process is alive AND not in the failed list
is_active_sub(Pid, FailedSubs) ->

View File

@ -202,10 +202,10 @@ notify(Event) ->
init([]) ->
TabOpts = [public, set, {write_concurrency, true}],
_ = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]),
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
ok = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]),
ok = emqx_tables:new(?SESSION_P_TAB, TabOpts),
ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
{ok, #{session_pmon => emqx_pmon:new()}}.

View File

@ -152,7 +152,7 @@ cast(Msg) ->
%%------------------------------------------------------------------------------
init(#{tick_ms := TickMs}) ->
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
?ROUTE_STATS, ?RETAINED_STATS]),
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),

View File

@ -17,10 +17,12 @@
-export([new/2]).
%% Create a named_table ets.
-spec(new(atom(), list()) -> ok).
new(Tab, Opts) ->
case ets:info(Tab, name) of
undefined ->
ets:new(Tab, lists:usort([named_table | Opts]));
Tab -> Tab
_ = ets:new(Tab, lists:usort([named_table | Opts])),
ok;
Tab -> ok
end.

View File

@ -68,7 +68,7 @@ stop() ->
%%------------------------------------------------------------------------------
init([]) ->
_ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
{ok, element(2, handle_info(reload, #{timer => undefined}))}.
handle_call(force_reload, _From, State) ->

View File

@ -0,0 +1,37 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sequence_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-import(emqx_sequence, [nextval/2, reclaim/2]).
all() ->
[sequence_generate].
sequence_generate(_) ->
ok = emqx_sequence:create(seqtab),
?assertEqual(1, nextval(seqtab, key)),
?assertEqual(2, nextval(seqtab, key)),
?assertEqual(3, nextval(seqtab, key)),
?assertEqual(2, reclaim(seqtab, key)),
?assertEqual(1, reclaim(seqtab, key)),
?assertEqual(0, reclaim(seqtab, key)),
?assertEqual(false, ets:member(seqtab, key)),
?assertEqual(1, nextval(seqtab, key)).