Implement subscription sharding.
1. Improve the design router, broker and shared_sub 2. New ets tables' design for subscription sharding
This commit is contained in:
parent
bce1ddc5c4
commit
36e7d63d66
|
@ -148,7 +148,7 @@ stop() ->
|
||||||
%%-----------------------------------------------------------------------------
|
%%-----------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
|
ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
|
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
|
||||||
|
|
|
@ -45,7 +45,7 @@ all_rules() ->
|
||||||
|
|
||||||
-spec(init([File :: string()]) -> {ok, #{}}).
|
-spec(init([File :: string()]) -> {ok, #{}}).
|
||||||
init([File]) ->
|
init([File]) ->
|
||||||
_ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
ok = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]),
|
||||||
true = load_rules_from_file(File),
|
true = load_rules_from_file(File),
|
||||||
{ok, #{acl_file => File}}.
|
{ok, #{acl_file => File}}.
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
-export([unsubscribe/1, unsubscribe/2]).
|
-export([unsubscribe/1, unsubscribe/2]).
|
||||||
-export([subscriber_down/1]).
|
-export([subscriber_down/1]).
|
||||||
-export([publish/1, safe_publish/1]).
|
-export([publish/1, safe_publish/1]).
|
||||||
-export([dispatch/2, dispatch/3]).
|
-export([dispatch/2]).
|
||||||
-export([subscriptions/1, subscribers/1, subscribed/2]).
|
-export([subscriptions/1, subscribers/1, subscribed/2]).
|
||||||
-export([get_subopts/2, set_subopts/2]).
|
-export([get_subopts/2, set_subopts/2]).
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
|
@ -34,8 +34,6 @@
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
-define(SHARD, 1024).
|
|
||||||
-define(TIMEOUT, 60000).
|
|
||||||
-define(BROKER, ?MODULE).
|
-define(BROKER, ?MODULE).
|
||||||
|
|
||||||
%% ETS tables
|
%% ETS tables
|
||||||
|
@ -44,33 +42,36 @@
|
||||||
-define(SUBSCRIBER, emqx_subscriber).
|
-define(SUBSCRIBER, emqx_subscriber).
|
||||||
-define(SUBSCRIPTION, emqx_subscription).
|
-define(SUBSCRIPTION, emqx_subscription).
|
||||||
|
|
||||||
%% Gards
|
%% Guards
|
||||||
-define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))).
|
-define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))).
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
_ = create_tabs(),
|
_ = create_tabs(),
|
||||||
gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, ?MODULE, [Pool, Id], []).
|
Name = emqx_misc:proc_name(?BROKER, Id),
|
||||||
|
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Create tabs
|
%% Create tabs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
-spec(create_tabs() -> ok).
|
||||||
create_tabs() ->
|
create_tabs() ->
|
||||||
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
||||||
|
|
||||||
%% SubId: SubId -> SubPid1, SubPid2,...
|
%% SubId: SubId -> SubPid
|
||||||
_ = emqx_tables:new(?SUBID, [bag | TabOpts]),
|
ok = emqx_tables:new(?SUBID, [set | TabOpts]),
|
||||||
|
|
||||||
%% SubOption: {SubPid, Topic} -> SubOption
|
%% SubOption: {SubPid, Topic} -> SubOption
|
||||||
_ = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
||||||
|
|
||||||
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
|
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
|
||||||
%% duplicate_bag: o(1) insert
|
%% duplicate_bag: o(1) insert
|
||||||
_ = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
||||||
|
|
||||||
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
||||||
%% duplicate_bag: o(1) insert
|
%% duplicate_bag: o(1) insert
|
||||||
emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]).
|
ok = emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Subscribe API
|
%% Subscribe API
|
||||||
|
@ -92,14 +93,23 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map
|
||||||
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
||||||
false ->
|
false ->
|
||||||
ok = emqx_broker_helper:monitor(SubPid, SubId),
|
ok = emqx_broker_helper:monitor(SubPid, SubId),
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
|
||||||
%% true = ets:insert(?SUBID, {SubId, SubPid}),
|
%% true = ets:insert(?SUBID, {SubId, SubPid}),
|
||||||
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
||||||
%% SeqId = emqx_broker_helper:create_seq(Topic),
|
case maps:get(share, SubOpts, undefined) of
|
||||||
true = ets:insert(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
undefined ->
|
||||||
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
Shard = emqx_broker_helper:get_shard(SubPid, Topic),
|
||||||
ok = emqx_shared_sub:subscribe(Group, Topic, SubPid),
|
case Shard of
|
||||||
call(pick(Topic), {subscribe, Group, Topic});
|
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid});
|
||||||
|
I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
|
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}})
|
||||||
|
end,
|
||||||
|
SubOpts1 = maps:put(shard, Shard, SubOpts),
|
||||||
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}),
|
||||||
|
call(pick({Topic, Shard}), {subscribe, Topic});
|
||||||
|
Group -> %% Shared subscription
|
||||||
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
||||||
|
emqx_shared_sub:subscribe(Group, Topic, SubPid)
|
||||||
|
end;
|
||||||
true -> ok
|
true -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -112,12 +122,21 @@ unsubscribe(Topic) when is_binary(Topic) ->
|
||||||
SubPid = self(),
|
SubPid = self(),
|
||||||
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
||||||
[{_, SubOpts}] ->
|
[{_, SubOpts}] ->
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||||
|
case maps:get(share, SubOpts, undefined) of
|
||||||
|
undefined ->
|
||||||
|
case maps:get(shared, SubOpts, 0) of
|
||||||
|
0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||||
|
ok = cast(pick(Topic), {unsubscribed, Topic});
|
||||||
|
I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
|
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
||||||
|
end;
|
||||||
|
Group ->
|
||||||
|
ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid)
|
||||||
|
end,
|
||||||
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
||||||
true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
%%true = ets:delete_object(?SUBID, {SubId, SubPid}),
|
||||||
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
true = ets:delete(?SUBOPTION, {SubPid, Topic});
|
||||||
ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid),
|
|
||||||
call(pick(Topic), {unsubscribe, Group, Topic});
|
|
||||||
[] -> ok
|
[] -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -207,22 +226,23 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) ->
|
||||||
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
emqx_hooks:run('message.dropped', [#{node => node()}, Msg]),
|
||||||
inc_dropped_cnt(Topic),
|
inc_dropped_cnt(Topic),
|
||||||
Delivery;
|
Delivery;
|
||||||
[SubPid] -> %% optimize?
|
[Sub] -> %% optimize?
|
||||||
dispatch(SubPid, Topic, Msg),
|
dispatch(Sub, Topic, Msg),
|
||||||
Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
|
Delivery#delivery{results = [{dispatch, Topic, 1}|Results]};
|
||||||
SubPids ->
|
Subs ->
|
||||||
Count = lists:foldl(fun(SubPid, Acc) ->
|
Count = lists:foldl(
|
||||||
dispatch(SubPid, Topic, Msg), Acc + 1
|
fun(Sub, Acc) ->
|
||||||
end, 0, SubPids),
|
dispatch(Sub, Topic, Msg), Acc + 1
|
||||||
|
end, 0, Subs),
|
||||||
Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
|
Delivery#delivery{results = [{dispatch, Topic, Count}|Results]}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
||||||
SubPid ! {dispatch, Topic, Msg},
|
SubPid ! {dispatch, Topic, Msg};
|
||||||
true;
|
dispatch({shard, I}, Topic, Msg) ->
|
||||||
%% TODO: how to optimize the share sub?
|
lists:foreach(fun(SubPid) ->
|
||||||
dispatch({share, _Group, _SubPid}, _Topic, _Msg) ->
|
SubPid ! {dispatch, Topic, Msg}
|
||||||
false.
|
end, safe_lookup_element(?SUBSCRIBER, {share, Topic, I}, [])).
|
||||||
|
|
||||||
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
inc_dropped_cnt(<<"$SYS/", _/binary>>) ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -240,17 +260,20 @@ subscribers(Topic) ->
|
||||||
-spec(subscriber_down(pid()) -> true).
|
-spec(subscriber_down(pid()) -> true).
|
||||||
subscriber_down(SubPid) ->
|
subscriber_down(SubPid) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Sub = {_, Topic}) ->
|
fun(Sub = {_Pid, Topic}) ->
|
||||||
case ets:lookup(?SUBOPTION, Sub) of
|
case ets:lookup(?SUBOPTION, Sub) of
|
||||||
[{_, SubOpts}] ->
|
[{_, SubOpts}] ->
|
||||||
Group = maps:get(share, SubOpts, undefined),
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
||||||
true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}),
|
case maps:get(shared, SubOpts, 0) of
|
||||||
true = ets:delete(?SUBOPTION, Sub),
|
0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
||||||
gen_server:cast(pick(Topic), {unsubscribe, Group, Topic});
|
ok = cast(pick(Topic), {unsubscribed, Topic});
|
||||||
|
I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
||||||
|
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
||||||
|
end;
|
||||||
[] -> ok
|
[] -> ok
|
||||||
end
|
end
|
||||||
end, ets:lookup(?SUBSCRIPTION, SubPid)),
|
end, ets:lookup(?SUBSCRIPTION, SubPid)),
|
||||||
ets:delete(?SUBSCRIPTION, SubPid).
|
true = ets:delete(?SUBSCRIPTION, SubPid).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Management APIs
|
%% Management APIs
|
||||||
|
@ -305,11 +328,14 @@ safe_update_stats(Tab, Stat, MaxStat) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Pick and call
|
%% call, cast, pick
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
call(Broker, Req) ->
|
call(Broker, Req) ->
|
||||||
gen_server:call(Broker, Req, ?TIMEOUT).
|
gen_server:call(Broker, Req).
|
||||||
|
|
||||||
|
cast(Broker, Msg) ->
|
||||||
|
gen_server:cast(Broker, Msg).
|
||||||
|
|
||||||
%% Pick a broker
|
%% Pick a broker
|
||||||
pick(Topic) ->
|
pick(Topic) ->
|
||||||
|
@ -320,24 +346,41 @@ pick(Topic) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
|
_ = emqx_router:set_mode(protected),
|
||||||
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
{ok, #{pool => Pool, id => Id}}.
|
{ok, #{pool => Pool, id => Id}}.
|
||||||
|
|
||||||
handle_call({subscribe, Group, Topic}, _From, State) ->
|
handle_call({subscribe, Topic}, _From, State) ->
|
||||||
Ok = emqx_router:add_route(Topic, dest(Group)),
|
case get(Topic) of
|
||||||
{reply, Ok, State};
|
undefined ->
|
||||||
|
_ = put(Topic, true),
|
||||||
handle_call({unsubscribe, Group, Topic}, _From, State) ->
|
emqx_router:add_route(Topic);
|
||||||
Ok = case ets:member(?SUBSCRIBER, Topic) of
|
true -> ok
|
||||||
false -> emqx_router:delete_route(Topic, dest(Group));
|
end,
|
||||||
true -> ok
|
{reply, ok, State};
|
||||||
end,
|
|
||||||
{reply, Ok, State};
|
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
|
handle_cast({unsubscribed, Topic}, State) ->
|
||||||
|
case ets:member(?SUBSCRIBER, Topic) of
|
||||||
|
false ->
|
||||||
|
_ = erase(Topic),
|
||||||
|
emqx_router:delete_route(Topic);
|
||||||
|
true -> ok
|
||||||
|
end,
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_cast({unsubscribed, Topic, I}, State) ->
|
||||||
|
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
|
||||||
|
false ->
|
||||||
|
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
|
||||||
|
cast(pick(Topic), {unsubscribed, Topic});
|
||||||
|
true -> ok
|
||||||
|
end,
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -356,9 +399,3 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
dest(undefined) -> node();
|
|
||||||
dest(Group) -> {Group, node()}.
|
|
||||||
|
|
||||||
shared(undefined, Name) -> Name;
|
|
||||||
shared(Group, Name) -> {share, Group, Name}.
|
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
-export([monitor/2]).
|
-export([monitor/2]).
|
||||||
|
-export([get_shard/2]).
|
||||||
-export([create_seq/1, reclaim_seq/1]).
|
-export([create_seq/1, reclaim_seq/1]).
|
||||||
|
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
|
@ -46,6 +47,13 @@ monitor(SubPid, SubId) when is_pid(SubPid) ->
|
||||||
error(subid_conflict)
|
error(subid_conflict)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()).
|
||||||
|
get_shard(SubPid, Topic) ->
|
||||||
|
case create_seq(Topic) of
|
||||||
|
Seq when Seq =< 1024 -> 0;
|
||||||
|
_Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2))
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
|
-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()).
|
||||||
create_seq(Topic) ->
|
create_seq(Topic) ->
|
||||||
emqx_sequence:nextval(?SUBSEQ, Topic).
|
emqx_sequence:nextval(?SUBSEQ, Topic).
|
||||||
|
@ -60,9 +68,11 @@ reclaim_seq(Topic) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%% SubSeq: Topic -> SeqId
|
%% SubSeq: Topic -> SeqId
|
||||||
_ = emqx_sequence:create(?SUBSEQ),
|
ok = emqx_sequence:create(?SUBSEQ),
|
||||||
|
%% Shards: CPU * 32
|
||||||
|
true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}),
|
||||||
%% SubMon: SubPid -> SubId
|
%% SubMon: SubPid -> SubId
|
||||||
_ = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
|
ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]),
|
||||||
%% Stats timer
|
%% Stats timer
|
||||||
emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
|
emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0),
|
||||||
{ok, #state{pmon = emqx_pmon:new()}, hibernate}.
|
{ok, #state{pmon = emqx_pmon:new()}, hibernate}.
|
||||||
|
|
|
@ -125,9 +125,9 @@ notify(Msg) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
TabOpts = [public, set, {write_concurrency, true}],
|
TabOpts = [public, set, {write_concurrency, true}],
|
||||||
_ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
|
||||||
_ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
|
||||||
ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
|
ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
|
||||||
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
{ok, #{conn_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
|
|
|
@ -96,7 +96,7 @@ usage() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_ = emqx_tables:new(?TAB, [ordered_set, protected]),
|
ok = emqx_tables:new(?TAB, [protected, ordered_set]),
|
||||||
{ok, #state{seq = 0}}.
|
{ok, #state{seq = 0}}.
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
|
|
|
@ -139,7 +139,7 @@ lookup(HookPoint) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
|
ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
|
||||||
{ok, #{}}.
|
{ok, #{}}.
|
||||||
|
|
||||||
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
|
handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) ->
|
||||||
|
|
|
@ -285,7 +285,7 @@ qos_sent(?QOS_2) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
% Create metrics table
|
% Create metrics table
|
||||||
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
|
||||||
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
|
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
|
||||||
{ok, #{}, hibernate}.
|
{ok, #{}, hibernate}.
|
||||||
|
|
||||||
|
|
|
@ -28,23 +28,22 @@
|
||||||
-export([start_link/2]).
|
-export([start_link/2]).
|
||||||
|
|
||||||
%% Route APIs
|
%% Route APIs
|
||||||
-export([add_route/1, add_route/2, add_route/3]).
|
-export([add_route/1, add_route/2]).
|
||||||
-export([get_routes/1]).
|
-export([get_routes/1]).
|
||||||
-export([del_route/1, del_route/2, del_route/3]).
|
-export([delete_route/1, delete_route/2]).
|
||||||
-export([has_routes/1, match_routes/1, print_routes/1]).
|
-export([has_routes/1, match_routes/1, print_routes/1]).
|
||||||
-export([topics/0]).
|
-export([topics/0]).
|
||||||
|
|
||||||
|
%% Mode
|
||||||
|
-export([set_mode/1, get_mode/0]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
|
||||||
code_change/3]).
|
code_change/3]).
|
||||||
|
|
||||||
-type(destination() :: node() | {binary(), node()}).
|
-type(destination() :: node() | {binary(), node()}).
|
||||||
|
|
||||||
-record(batch, {enabled, timer, pending}).
|
|
||||||
-record(state, {pool, id, batch :: #batch{}}).
|
|
||||||
|
|
||||||
-define(ROUTE, emqx_route).
|
-define(ROUTE, emqx_route).
|
||||||
-define(BATCH(Enabled), #batch{enabled = Enabled}).
|
|
||||||
-define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}).
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
|
@ -62,49 +61,66 @@ mnesia(copy) ->
|
||||||
ok = ekka_mnesia:copy_table(?ROUTE).
|
ok = ekka_mnesia:copy_table(?ROUTE).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Strat a router
|
%% Start a router
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
|
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
|
||||||
start_link(Pool, Id) ->
|
start_link(Pool, Id) ->
|
||||||
gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)},
|
Name = emqx_misc:proc_name(?MODULE, Id),
|
||||||
?MODULE, [Pool, Id], [{hibernate_after, 2000}]).
|
gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]).
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Route APIs
|
%% Route APIs
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok).
|
-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
|
||||||
add_route(Topic) when is_binary(Topic) ->
|
add_route(Topic) when is_binary(Topic) ->
|
||||||
add_route(#route{topic = Topic, dest = node()});
|
add_route(#route{topic = Topic, dest = node()});
|
||||||
add_route(Route = #route{topic = Topic}) ->
|
add_route(Route = #route{topic = Topic}) ->
|
||||||
cast(pick(Topic), {add_route, Route}).
|
case get_mode() of
|
||||||
|
protected -> do_add_route(Route);
|
||||||
|
undefined -> call(pick(Topic), {add_route, Route})
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(add_route(emqx_topic:topic(), destination()) -> ok).
|
-spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||||
add_route(Topic, Dest) when is_binary(Topic) ->
|
add_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
add_route(#route{topic = Topic, dest = Dest}).
|
add_route(#route{topic = Topic, dest = Dest}).
|
||||||
|
|
||||||
-spec(add_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
|
%% @private
|
||||||
add_route(From, Topic, Dest) when is_binary(Topic) ->
|
do_add_route(Route = #route{topic = Topic, dest = Dest}) ->
|
||||||
cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}).
|
case lists:member(Route, get_routes(Topic)) of
|
||||||
|
true -> ok;
|
||||||
|
false ->
|
||||||
|
ok = emqx_router_helper:monitor(Dest),
|
||||||
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true -> trans(fun add_trie_route/1, [Route]);
|
||||||
|
false -> add_direct_route(Route)
|
||||||
|
end
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
-spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]).
|
||||||
get_routes(Topic) ->
|
get_routes(Topic) ->
|
||||||
ets:lookup(?ROUTE, Topic).
|
ets:lookup(?ROUTE, Topic).
|
||||||
|
|
||||||
-spec(del_route(emqx_topic:topic() | emqx_types:route()) -> ok).
|
-spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}).
|
||||||
del_route(Topic) when is_binary(Topic) ->
|
delete_route(Topic) when is_binary(Topic) ->
|
||||||
del_route(#route{topic = Topic, dest = node()});
|
delete_route(#route{topic = Topic, dest = node()});
|
||||||
del_route(Route = #route{topic = Topic}) ->
|
delete_route(Route = #route{topic = Topic}) ->
|
||||||
cast(pick(Topic), {del_route, Route}).
|
case get_mode() of
|
||||||
|
protected -> do_delete_route(Route);
|
||||||
|
undefined -> call(pick(Topic), {delete_route, Route})
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(del_route(emqx_topic:topic(), destination()) -> ok).
|
-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}).
|
||||||
del_route(Topic, Dest) when is_binary(Topic) ->
|
delete_route(Topic, Dest) when is_binary(Topic) ->
|
||||||
del_route(#route{topic = Topic, dest = Dest}).
|
delete_route(#route{topic = Topic, dest = Dest}).
|
||||||
|
|
||||||
-spec(del_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok).
|
%% @private
|
||||||
del_route(From, Topic, Dest) when is_binary(Topic) ->
|
do_delete_route(Route = #route{topic = Topic}) ->
|
||||||
cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}).
|
case emqx_topic:wildcard(Topic) of
|
||||||
|
true -> trans(fun del_trie_route/1, [Route]);
|
||||||
|
false -> del_direct_route(Route)
|
||||||
|
end.
|
||||||
|
|
||||||
-spec(has_routes(emqx_topic:topic()) -> boolean()).
|
-spec(has_routes(emqx_topic:topic()) -> boolean()).
|
||||||
has_routes(Topic) when is_binary(Topic) ->
|
has_routes(Topic) when is_binary(Topic) ->
|
||||||
|
@ -127,8 +143,15 @@ print_routes(Topic) ->
|
||||||
io:format("~s -> ~s~n", [To, Dest])
|
io:format("~s -> ~s~n", [To, Dest])
|
||||||
end, match_routes(Topic)).
|
end, match_routes(Topic)).
|
||||||
|
|
||||||
cast(Router, Msg) ->
|
-spec(set_mode(protected | atom()) -> any()).
|
||||||
gen_server:cast(Router, Msg).
|
set_mode(Mode) when is_atom(Mode) ->
|
||||||
|
put('$router_mode', Mode).
|
||||||
|
|
||||||
|
-spec(get_mode() -> protected | undefined | atom()).
|
||||||
|
get_mode() -> get('$router_mode').
|
||||||
|
|
||||||
|
call(Router, Msg) ->
|
||||||
|
gen_server:call(Router, Msg, infinity).
|
||||||
|
|
||||||
pick(Topic) ->
|
pick(Topic) ->
|
||||||
gproc_pool:pick_worker(router, Topic).
|
gproc_pool:pick_worker(router, Topic).
|
||||||
|
@ -138,71 +161,28 @@ pick(Topic) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([Pool, Id]) ->
|
init([Pool, Id]) ->
|
||||||
rand:seed(exsplus, erlang:timestamp()),
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
||||||
gproc_pool:connect_worker(Pool, {Pool, Id}),
|
{ok, #{pool => Pool, id => Id}}.
|
||||||
Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false),
|
|
||||||
pending = sets:new()},
|
handle_call({add_route, Route}, _From, State) ->
|
||||||
{ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}.
|
{reply, do_add_route(Route), State};
|
||||||
|
|
||||||
|
handle_call({delete_route, Route}, _From, State) ->
|
||||||
|
{reply, do_delete_route(Route), State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
|
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({add_route, From, Route}, State) ->
|
|
||||||
{noreply, NewState} = handle_cast({add_route, Route}, State),
|
|
||||||
_ = gen_server:reply(From, ok),
|
|
||||||
{noreply, NewState};
|
|
||||||
|
|
||||||
handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) ->
|
|
||||||
case lists:member(Route, get_routes(Topic)) of
|
|
||||||
true -> ok;
|
|
||||||
false ->
|
|
||||||
ok = emqx_router_helper:monitor(Dest),
|
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true -> log(trans(fun add_trie_route/1, [Route]));
|
|
||||||
false -> add_direct_route(Route)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
handle_cast({del_route, From, Route}, State) ->
|
|
||||||
{noreply, NewState} = handle_cast({del_route, Route}, State),
|
|
||||||
_ = gen_server:reply(From, ok),
|
|
||||||
{noreply, NewState};
|
|
||||||
|
|
||||||
handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) ->
|
|
||||||
{noreply, case emqx_topic:wildcard(Topic) of
|
|
||||||
true -> log(trans(fun del_trie_route/1, [Route])),
|
|
||||||
State;
|
|
||||||
false -> del_direct_route(Route, State)
|
|
||||||
end};
|
|
||||||
|
|
||||||
handle_cast({del_route, Route = #route{topic = Topic}}, State) ->
|
|
||||||
%% Confirm if there are still subscribers...
|
|
||||||
{noreply, case ets:member(emqx_subscriber, Topic) of
|
|
||||||
true -> State;
|
|
||||||
false ->
|
|
||||||
case emqx_topic:wildcard(Topic) of
|
|
||||||
true -> log(trans(fun del_trie_route/1, [Route])),
|
|
||||||
State;
|
|
||||||
false -> del_direct_route(Route, State)
|
|
||||||
end
|
|
||||||
end};
|
|
||||||
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[Router] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[Router] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) ->
|
|
||||||
_ = del_direct_routes(sets:to_list(Batch#batch.pending)),
|
|
||||||
{noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate};
|
|
||||||
|
|
||||||
handle_info(Info, State) ->
|
handle_info(Info, State) ->
|
||||||
emqx_logger:error("[Router] unexpected info: ~p", [Info]),
|
emqx_logger:error("[Router] unexpected info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) ->
|
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
||||||
_ = cacel_batch_timer(Batch),
|
|
||||||
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
||||||
|
|
||||||
code_change(_OldVsn, State, _Extra) ->
|
code_change(_OldVsn, State, _Extra) ->
|
||||||
|
@ -212,17 +192,6 @@ code_change(_OldVsn, State, _Extra) ->
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) ->
|
|
||||||
State;
|
|
||||||
ensure_batch_timer(State = #state{batch = Batch}) ->
|
|
||||||
TRef = erlang:start_timer(50 + rand:uniform(50), self(), batch_delete),
|
|
||||||
State#state{batch = Batch#batch{timer = TRef}}.
|
|
||||||
|
|
||||||
cacel_batch_timer(#batch{enabled = false}) ->
|
|
||||||
ok;
|
|
||||||
cacel_batch_timer(#batch{enabled = true, timer = TRef}) ->
|
|
||||||
catch erlang:cancel_timer(TRef).
|
|
||||||
|
|
||||||
add_direct_route(Route) ->
|
add_direct_route(Route) ->
|
||||||
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
|
mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]).
|
||||||
|
|
||||||
|
@ -233,25 +202,9 @@ add_trie_route(Route = #route{topic = Topic}) ->
|
||||||
end,
|
end,
|
||||||
mnesia:write(?ROUTE, Route, sticky_write).
|
mnesia:write(?ROUTE, Route, sticky_write).
|
||||||
|
|
||||||
del_direct_route(Route, State = #state{batch = ?BATCH(false)}) ->
|
|
||||||
del_direct_route(Route), State;
|
|
||||||
del_direct_route(Route, State = #state{batch = Batch = ?BATCH(true, Pending)}) ->
|
|
||||||
State#state{batch = Batch#batch{pending = sets:add_element(Route, Pending)}}.
|
|
||||||
|
|
||||||
del_direct_route(Route) ->
|
del_direct_route(Route) ->
|
||||||
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
|
mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]).
|
||||||
|
|
||||||
del_direct_routes([]) ->
|
|
||||||
ok;
|
|
||||||
del_direct_routes(Routes) ->
|
|
||||||
DelFun = fun(R = #route{topic = Topic}) ->
|
|
||||||
case ets:member(emqx_subscriber, Topic) of
|
|
||||||
true -> ok;
|
|
||||||
false -> mnesia:delete_object(?ROUTE, R, sticky_write)
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
mnesia:async_dirty(fun lists:foreach/2, [DelFun, Routes]).
|
|
||||||
|
|
||||||
del_trie_route(Route = #route{topic = Topic}) ->
|
del_trie_route(Route = #route{topic = Topic}) ->
|
||||||
case mnesia:wread({?ROUTE, Topic}) of
|
case mnesia:wread({?ROUTE, Topic}) of
|
||||||
[Route] -> %% Remove route and trie
|
[Route] -> %% Remove route and trie
|
||||||
|
@ -270,7 +223,3 @@ trans(Fun, Args) ->
|
||||||
{aborted, Error} -> {error, Error}
|
{aborted, Error} -> {error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
log(ok) -> ok;
|
|
||||||
log({error, Reason}) ->
|
|
||||||
emqx_logger:error("[Router] mnesia aborted: ~p", [Reason]).
|
|
||||||
|
|
||||||
|
|
|
@ -25,8 +25,7 @@
|
||||||
%% @doc Create a sequence.
|
%% @doc Create a sequence.
|
||||||
-spec(create(name()) -> ok).
|
-spec(create(name()) -> ok).
|
||||||
create(Name) ->
|
create(Name) ->
|
||||||
_ = ets:new(Name, [set, public, named_table, {write_concurrency, true}]),
|
emqx_tables:new(Name, [public, set, {write_concurrency, true}]).
|
||||||
ok.
|
|
||||||
|
|
||||||
%% @doc Next value of the sequence.
|
%% @doc Next value of the sequence.
|
||||||
-spec(nextval(name(), key()) -> seqid()).
|
-spec(nextval(name(), key()) -> seqid()).
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
-include("emqx_mqtt.hrl").
|
||||||
|
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
-export([mnesia/1]).
|
-export([mnesia/1]).
|
||||||
|
@ -27,7 +28,8 @@
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([subscribe/3, unsubscribe/3]).
|
-export([subscribe/3, unsubscribe/3]).
|
||||||
-export([dispatch/3, maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]).
|
-export([dispatch/3]).
|
||||||
|
-export([maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]).
|
||||||
|
|
||||||
%% for testing
|
%% for testing
|
||||||
-export([subscribers/2]).
|
-export([subscribers/2]).
|
||||||
|
@ -38,6 +40,7 @@
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
-define(TAB, emqx_shared_subscription).
|
-define(TAB, emqx_shared_subscription).
|
||||||
|
-define(SHARED_SUBS, emqx_shared_subscriber).
|
||||||
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
-define(ALIVE_SUBS, emqx_alive_shared_subscribers).
|
||||||
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
-define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5).
|
||||||
-define(ack, shared_sub_ack).
|
-define(ack, shared_sub_ack).
|
||||||
|
@ -48,8 +51,6 @@
|
||||||
-record(state, {pmon}).
|
-record(state, {pmon}).
|
||||||
-record(emqx_shared_subscription, {group, topic, subpid}).
|
-record(emqx_shared_subscription, {group, topic, subpid}).
|
||||||
|
|
||||||
-include("emqx_mqtt.hrl").
|
|
||||||
|
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Mnesia bootstrap
|
%% Mnesia bootstrap
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
@ -72,16 +73,11 @@ mnesia(copy) ->
|
||||||
start_link() ->
|
start_link() ->
|
||||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
subscribe(undefined, _Topic, _SubPid) ->
|
|
||||||
ok;
|
|
||||||
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
subscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}).
|
||||||
gen_server:cast(?SERVER, {monitor, SubPid}).
|
|
||||||
|
|
||||||
unsubscribe(undefined, _Topic, _SubPid) ->
|
|
||||||
ok;
|
|
||||||
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) ->
|
||||||
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)).
|
gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}).
|
||||||
|
|
||||||
record(Group, Topic, SubPid) ->
|
record(Group, Topic, SubPid) ->
|
||||||
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
#emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}.
|
||||||
|
@ -251,14 +247,16 @@ do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) ->
|
||||||
subscribers(Group, Topic) ->
|
subscribers(Group, Topic) ->
|
||||||
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]).
|
||||||
|
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
%%-----------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
|
_ = emqx_router:set_mode(protected),
|
||||||
mnesia:subscribe({table, ?TAB, simple}),
|
mnesia:subscribe({table, ?TAB, simple}),
|
||||||
ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]),
|
{atomic, PMon} = mnesia:transaction(fun init_monitors/0),
|
||||||
|
ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]),
|
||||||
|
ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]),
|
||||||
{ok, update_stats(#state{pmon = PMon})}.
|
{ok, update_stats(#state{pmon = PMon})}.
|
||||||
|
|
||||||
init_monitors() ->
|
init_monitors() ->
|
||||||
|
@ -267,14 +265,29 @@ init_monitors() ->
|
||||||
emqx_pmon:monitor(SubPid, Mon)
|
emqx_pmon:monitor(SubPid, Mon)
|
||||||
end, emqx_pmon:new(), ?TAB).
|
end, emqx_pmon:new(), ?TAB).
|
||||||
|
|
||||||
|
handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) ->
|
||||||
|
mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)),
|
||||||
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
|
true -> ok;
|
||||||
|
false -> ok = emqx_router:add_route(Topic, {Group, node()})
|
||||||
|
end,
|
||||||
|
ok = maybe_insert_alive_tab(SubPid),
|
||||||
|
true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
|
{reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})};
|
||||||
|
|
||||||
|
handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
|
||||||
|
mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)),
|
||||||
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
|
true -> ok;
|
||||||
|
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
|
||||||
|
end,
|
||||||
|
{reply, ok, State};
|
||||||
|
|
||||||
handle_call(Req, _From, State) ->
|
handle_call(Req, _From, State) ->
|
||||||
emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]),
|
emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]),
|
||||||
{reply, ignored, State}.
|
{reply, ignored, State}.
|
||||||
|
|
||||||
handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) ->
|
|
||||||
NewPmon = emqx_pmon:monitor(SubPid, PMon),
|
|
||||||
ok = maybe_insert_alive_tab(SubPid),
|
|
||||||
{noreply, update_stats(State#state{pmon = NewPmon})};
|
|
||||||
handle_cast(Msg, State) ->
|
handle_cast(Msg, State) ->
|
||||||
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
|
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
@ -316,12 +329,18 @@ maybe_insert_alive_tab(Pid) when is_pid(Pid) -> ets:insert(?ALIVE_SUBS, {Pid}),
|
||||||
cleanup_down(SubPid) ->
|
cleanup_down(SubPid) ->
|
||||||
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
|
?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid),
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Record) ->
|
fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) ->
|
||||||
mnesia:dirty_delete_object(?TAB, Record)
|
ok = mnesia:dirty_delete_object(?TAB, Record),
|
||||||
end,mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}),
|
||||||
|
case ets:member(?SHARED_SUBS, {Group, Topic}) of
|
||||||
|
true -> ok;
|
||||||
|
false -> ok = emqx_router:delete_route(Topic, {Group, node()})
|
||||||
|
end
|
||||||
|
end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})).
|
||||||
|
|
||||||
update_stats(State) ->
|
update_stats(State) ->
|
||||||
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State.
|
emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)),
|
||||||
|
State.
|
||||||
|
|
||||||
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
%% Return 'true' if the subscriber process is alive AND not in the failed list
|
||||||
is_active_sub(Pid, FailedSubs) ->
|
is_active_sub(Pid, FailedSubs) ->
|
||||||
|
|
|
@ -202,10 +202,10 @@ notify(Event) ->
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
TabOpts = [public, set, {write_concurrency, true}],
|
TabOpts = [public, set, {write_concurrency, true}],
|
||||||
_ = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]),
|
ok = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]),
|
||||||
_ = emqx_tables:new(?SESSION_P_TAB, TabOpts),
|
ok = emqx_tables:new(?SESSION_P_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
|
||||||
_ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
|
||||||
emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
|
emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
|
||||||
{ok, #{session_pmon => emqx_pmon:new()}}.
|
{ok, #{session_pmon => emqx_pmon:new()}}.
|
||||||
|
|
||||||
|
|
|
@ -152,7 +152,7 @@ cast(Msg) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init(#{tick_ms := TickMs}) ->
|
init(#{tick_ms := TickMs}) ->
|
||||||
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
|
ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]),
|
||||||
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS,
|
||||||
?ROUTE_STATS, ?RETAINED_STATS]),
|
?ROUTE_STATS, ?RETAINED_STATS]),
|
||||||
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]),
|
||||||
|
|
|
@ -17,10 +17,12 @@
|
||||||
-export([new/2]).
|
-export([new/2]).
|
||||||
|
|
||||||
%% Create a named_table ets.
|
%% Create a named_table ets.
|
||||||
|
-spec(new(atom(), list()) -> ok).
|
||||||
new(Tab, Opts) ->
|
new(Tab, Opts) ->
|
||||||
case ets:info(Tab, name) of
|
case ets:info(Tab, name) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ets:new(Tab, lists:usort([named_table | Opts]));
|
_ = ets:new(Tab, lists:usort([named_table | Opts])),
|
||||||
Tab -> Tab
|
ok;
|
||||||
|
Tab -> ok
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
|
|
@ -68,7 +68,7 @@ stop() ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
init([]) ->
|
init([]) ->
|
||||||
_ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
|
ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]),
|
||||||
{ok, element(2, handle_info(reload, #{timer => undefined}))}.
|
{ok, element(2, handle_info(reload, #{timer => undefined}))}.
|
||||||
|
|
||||||
handle_call(force_reload, _From, State) ->
|
handle_call(force_reload, _From, State) ->
|
||||||
|
|
Loading…
Reference in New Issue