From b4d981daf2a549554e8e76f88b9b3c0676779932 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 4 Dec 2018 15:59:24 +0800 Subject: [PATCH 1/7] Add a sequence module to generate index for subscription sharding --- Makefile | 2 +- src/emqx_sequence.erl | 58 ++++++++++++++++++++++++++++++++++++ test/emqx_sequence_SUITE.erl | 37 +++++++++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 src/emqx_sequence.erl create mode 100644 test/emqx_sequence_SUITE.erl diff --git a/Makefile b/Makefile index 26bcf22ce..2c1693813 100644 --- a/Makefile +++ b/Makefile @@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \ emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \ emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \ - emqx_hooks emqx_batch + emqx_hooks emqx_batch emqx_sequence CT_NODE_NAME = emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl new file mode 100644 index 000000000..62a882294 --- /dev/null +++ b/src/emqx_sequence.erl @@ -0,0 +1,58 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_sequence). + +-export([create/0, create/1]). +-export([generate/1, generate/2]). +-export([reclaim/1, reclaim/2]). + +-type(key() :: term()). +-type(seqid() :: non_neg_integer()). + +-define(DEFAULT_TAB, ?MODULE). + +%% @doc Create a sequence. +-spec(create() -> ok). +create() -> + create(?DEFAULT_TAB). + +-spec(create(atom()) -> ok). +create(Tab) -> + _ = ets:new(Tab, [set, public, named_table, {write_concurrency, true}]), + ok. + +%% @doc Generate a sequence id. +-spec(generate(key()) -> seqid()). +generate(Key) -> + generate(?DEFAULT_TAB, Key). + +-spec(generate(atom(), key()) -> seqid()). +generate(Tab, Key) -> + ets:update_counter(Tab, Key, {2, 1}, {Key, 0}). + +%% @doc Reclaim a sequence id. +-spec(reclaim(key()) -> seqid()). +reclaim(Key) -> + reclaim(?DEFAULT_TAB, Key). + +-spec(reclaim(atom(), key()) -> seqid()). +reclaim(Tab, Key) -> + try ets:update_counter(Tab, Key, {2, -1, 0, 0}) of + 0 -> ets:delete_object(Tab, {Key, 0}), 0; + I -> I + catch + error:badarg -> 0 + end. + diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl new file mode 100644 index 000000000..999a95723 --- /dev/null +++ b/test/emqx_sequence_SUITE.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +-module(emqx_sequence_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("eunit/include/eunit.hrl"). + +-import(emqx_sequence, [generate/1, reclaim/1]). + +all() -> + [sequence_generate]. + +sequence_generate(_) -> + ok = emqx_sequence:create(), + ?assertEqual(1, generate(key)), + ?assertEqual(2, generate(key)), + ?assertEqual(3, generate(key)), + ?assertEqual(2, reclaim(key)), + ?assertEqual(1, reclaim(key)), + ?assertEqual(0, reclaim(key)), + ?assertEqual(false, ets:member(emqx_sequence, key)), + ?assertEqual(1, generate(key)). + From bce1ddc5c4d226e7f0e1ae6648ab5d1da54c12ff Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Thu, 6 Dec 2018 18:45:07 +0800 Subject: [PATCH 2/7] Implement a hash-based subscription sharding --- src/emqx.erl | 53 ++--- src/emqx_broker.erl | 444 ++++++++++++++--------------------- src/emqx_broker_helper.erl | 75 +++--- src/emqx_broker_sup.erl | 51 ++-- src/emqx_local_bridge.erl | 2 +- src/emqx_sequence.erl | 54 +++-- src/emqx_session.erl | 2 +- test/emqx_sequence_SUITE.erl | 20 +- 8 files changed, 299 insertions(+), 402 deletions(-) diff --git a/src/emqx.erl b/src/emqx.erl index 72f1d6f81..3792cc4f8 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -26,7 +26,6 @@ %% PubSub management API -export([topics/0, subscriptions/1, subscribers/1, subscribed/2]). --export([get_subopts/2, set_subopts/3]). %% Hooks API -export([hook/2, hook/3, hook/4, unhook/2, run_hooks/2, run_hooks/3]). @@ -70,20 +69,18 @@ is_running(Node) -> subscribe(Topic) -> emqx_broker:subscribe(iolist_to_binary(Topic)). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). +-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)-> emqx_broker:subscribe(iolist_to_binary(Topic), SubId); -subscribe(Topic, SubPid) when is_pid(SubPid) -> - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid). +subscribe(Topic, SubOpts) when is_map(SubOpts) -> + emqx_broker:subscribe(iolist_to_binary(Topic), SubOpts). --spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(), - emqx_types:subopts()) -> ok). -subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)-> - emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options); -subscribe(Topic, SubPid, Options) when is_pid(SubPid)-> - emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options). +-spec(subscribe(emqx_topic:topic() | string(), + emqx_types:subid() | pid(), emqx_types:subopts()) -> ok). +subscribe(Topic, SubId, SubOpts) when (is_atom(SubId) orelse is_binary(SubId)), is_map(SubOpts) -> + emqx_broker:subscribe(iolist_to_binary(Topic), SubId, SubOpts). --spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). +-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). publish(Msg) -> emqx_broker:publish(Msg). @@ -91,26 +88,14 @@ publish(Msg) -> unsubscribe(Topic) -> emqx_broker:unsubscribe(iolist_to_binary(Topic)). --spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok). -unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId); -unsubscribe(Topic, SubPid) when is_pid(SubPid) -> - emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid). +-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid()) -> ok). +unsubscribe(Topic, SubId) -> + emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId). %%------------------------------------------------------------------------------ %% PubSub management API %%------------------------------------------------------------------------------ --spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) - -> emqx_types:subopts()). -get_subopts(Topic, Subscriber) -> - emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber). - --spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(), - emqx_types:subopts()) -> boolean()). -set_subopts(Topic, Subscriber, Options) when is_map(Options) -> - emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options). - -spec(topics() -> list(emqx_topic:topic())). topics() -> emqx_router:topics(). @@ -118,15 +103,15 @@ topics() -> emqx_router:topics(). subscribers(Topic) -> emqx_broker:subscribers(iolist_to_binary(Topic)). --spec(subscriptions(emqx_types:subscriber()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). -subscriptions(Subscriber) -> - emqx_broker:subscriptions(Subscriber). +-spec(subscriptions(pid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). +subscriptions(SubPid) when is_pid(SubPid) -> + emqx_broker:subscriptions(SubPid). --spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()). -subscribed(Topic, SubPid) when is_pid(SubPid) -> - emqx_broker:subscribed(iolist_to_binary(Topic), SubPid); -subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) -> - emqx_broker:subscribed(iolist_to_binary(Topic), SubId). +-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic() | string()) -> boolean()). +subscribed(SubPid, Topic) when is_pid(SubPid) -> + emqx_broker:subscribed(SubPid, iolist_to_binary(Topic)); +subscribed(SubId, Topic) when is_atom(SubId); is_binary(SubId) -> + emqx_broker:subscribed(SubId, iolist_to_binary(Topic)). %%------------------------------------------------------------------------------ %% Hooks API diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index ca4a86c87..3a3cde1fa 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -19,150 +19,130 @@ -include("emqx.hrl"). -export([start_link/2]). --export([subscribe/1, subscribe/2, subscribe/3, subscribe/4]). --export([multi_subscribe/1, multi_subscribe/2, multi_subscribe/3]). +-export([subscribe/1, subscribe/2, subscribe/3]). +-export([unsubscribe/1, unsubscribe/2]). +-export([subscriber_down/1]). -export([publish/1, safe_publish/1]). --export([unsubscribe/1, unsubscribe/2, unsubscribe/3]). --export([multi_unsubscribe/1, multi_unsubscribe/2, multi_unsubscribe/3]). -export([dispatch/2, dispatch/3]). -export([subscriptions/1, subscribers/1, subscribed/2]). --export([get_subopts/2, set_subopts/3]). +-export([get_subopts/2, set_subopts/2]). -export([topics/0]). +%% Stats fun +-export([stats_fun/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --ifdef(TEST). --compile(export_all). --compile(nowarn_export_all). --endif. - --record(state, {pool, id, submap, submon}). --record(subscribe, {topic, subpid, subid, subopts = #{}}). --record(unsubscribe, {topic, subpid, subid}). - -%% The default request timeout +-define(SHARD, 1024). -define(TIMEOUT, 60000). -define(BROKER, ?MODULE). %% ETS tables --define(SUBOPTION, emqx_suboption). --define(SUBSCRIBER, emqx_subscriber). +-define(SUBID, emqx_subid). +-define(SUBOPTION, emqx_suboption). +-define(SUBSCRIBER, emqx_subscriber). -define(SUBSCRIPTION, emqx_subscription). +%% Gards -define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))). --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, ?MODULE, - [Pool, Id], [{hibernate_after, 2000}]). + _ = create_tabs(), + gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ -%% Subscribe +%% Create tabs +%%------------------------------------------------------------------------------ + +create_tabs() -> + TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], + + %% SubId: SubId -> SubPid1, SubPid2,... + _ = emqx_tables:new(?SUBID, [bag | TabOpts]), + %% SubOption: {SubPid, Topic} -> SubOption + _ = emqx_tables:new(?SUBOPTION, [set | TabOpts]), + + %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... + %% duplicate_bag: o(1) insert + _ = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), + + %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... + %% duplicate_bag: o(1) insert + emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). + +%%------------------------------------------------------------------------------ +%% Subscribe API %%------------------------------------------------------------------------------ -spec(subscribe(emqx_topic:topic()) -> ok). subscribe(Topic) when is_binary(Topic) -> - subscribe(Topic, self()). + subscribe(Topic, undefined). --spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok). -subscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - subscribe(Topic, SubPid, undefined); +-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok). subscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - subscribe(Topic, self(), SubId). + subscribe(Topic, SubId, #{}); +subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) -> + subscribe(Topic, undefined, SubOpts). --spec(subscribe(emqx_topic:topic(), pid() | emqx_types:subid(), - emqx_types:subid() | emqx_types:subopts()) -> ok). -subscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - subscribe(Topic, SubPid, SubId, #{qos => 0}); -subscribe(Topic, SubPid, SubOpts) when is_binary(Topic), is_pid(SubPid), is_map(SubOpts) -> - subscribe(Topic, SubPid, undefined, SubOpts); +-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok). subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) -> - subscribe(Topic, self(), SubId, SubOpts). - --spec(subscribe(emqx_topic:topic(), pid(), emqx_types:subid(), emqx_types:subopts()) -> ok). -subscribe(Topic, SubPid, SubId, SubOpts) when is_binary(Topic), is_pid(SubPid), - ?is_subid(SubId), is_map(SubOpts) -> - Broker = pick(SubPid), - SubReq = #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}, - wait_for_reply(async_call(Broker, SubReq), ?TIMEOUT). - --spec(multi_subscribe(emqx_types:topic_table()) -> ok). -multi_subscribe(TopicTable) when is_list(TopicTable) -> - multi_subscribe(TopicTable, self()). - --spec(multi_subscribe(emqx_types:topic_table(), pid() | emqx_types:subid()) -> ok). -multi_subscribe(TopicTable, SubPid) when is_pid(SubPid) -> - multi_subscribe(TopicTable, SubPid, undefined); -multi_subscribe(TopicTable, SubId) when ?is_subid(SubId) -> - multi_subscribe(TopicTable, self(), SubId). - --spec(multi_subscribe(emqx_types:topic_table(), pid(), emqx_types:subid()) -> ok). -multi_subscribe(TopicTable, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -> - Broker = pick(SubPid), - SubReq = fun(Topic, SubOpts) -> - #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts} - end, - wait_for_replies([async_call(Broker, SubReq(Topic, SubOpts)) - || {Topic, SubOpts} <- TopicTable], ?TIMEOUT). + SubPid = self(), + case ets:member(?SUBOPTION, {SubPid, Topic}) of + false -> + ok = emqx_broker_helper:monitor(SubPid, SubId), + Group = maps:get(share, SubOpts, undefined), + %% true = ets:insert(?SUBID, {SubId, SubPid}), + true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), + %% SeqId = emqx_broker_helper:create_seq(Topic), + true = ets:insert(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + ok = emqx_shared_sub:subscribe(Group, Topic, SubPid), + call(pick(Topic), {subscribe, Group, Topic}); + true -> ok + end. %%------------------------------------------------------------------------------ -%% Unsubscribe +%% Unsubscribe API %%------------------------------------------------------------------------------ -spec(unsubscribe(emqx_topic:topic()) -> ok). unsubscribe(Topic) when is_binary(Topic) -> - unsubscribe(Topic, self()). + SubPid = self(), + case ets:lookup(?SUBOPTION, {SubPid, Topic}) of + [{_, SubOpts}] -> + Group = maps:get(share, SubOpts, undefined), + true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), + true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), + true = ets:delete(?SUBOPTION, {SubPid, Topic}), + ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid), + call(pick(Topic), {unsubscribe, Group, Topic}); + [] -> ok + end. --spec(unsubscribe(emqx_topic:topic(), pid() | emqx_types:subid()) -> ok). -unsubscribe(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - unsubscribe(Topic, SubPid, undefined); -unsubscribe(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - unsubscribe(Topic, self(), SubId). - --spec(unsubscribe(emqx_topic:topic(), pid(), emqx_types:subid()) -> ok). -unsubscribe(Topic, SubPid, SubId) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - Broker = pick(SubPid), - UnsubReq = #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}, - wait_for_reply(async_call(Broker, UnsubReq), ?TIMEOUT). - --spec(multi_unsubscribe([emqx_topic:topic()]) -> ok). -multi_unsubscribe(Topics) -> - multi_unsubscribe(Topics, self()). - --spec(multi_unsubscribe([emqx_topic:topic()], pid() | emqx_types:subid()) -> ok). -multi_unsubscribe(Topics, SubPid) when is_pid(SubPid) -> - multi_unsubscribe(Topics, SubPid, undefined); -multi_unsubscribe(Topics, SubId) when ?is_subid(SubId) -> - multi_unsubscribe(Topics, self(), SubId). - --spec(multi_unsubscribe([emqx_topic:topic()], pid(), emqx_types:subid()) -> ok). -multi_unsubscribe(Topics, SubPid, SubId) when is_pid(SubPid), ?is_subid(SubId) -> - Broker = pick(SubPid), - UnsubReq = fun(Topic) -> - #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId} - end, - wait_for_replies([async_call(Broker, UnsubReq(Topic)) || Topic <- Topics], ?TIMEOUT). +-spec(unsubscribe(emqx_topic:topic(), emqx_types:subid()) -> ok). +unsubscribe(Topic, _SubId) when is_binary(Topic) -> + unsubscribe(Topic). %%------------------------------------------------------------------------------ %% Publish %%------------------------------------------------------------------------------ --spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). +-spec(publish(emqx_types:message()) -> emqx_types:deliver_results()). publish(Msg) when is_record(Msg, message) -> _ = emqx_tracer:trace(publish, Msg), - {ok, case emqx_hooks:run('message.publish', [], Msg) of - {ok, Msg1 = #message{topic = Topic}} -> - Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), - Delivery#delivery.results; - {stop, _} -> - emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]), - [] - end}. + case emqx_hooks:run('message.publish', [], Msg) of + {ok, Msg1 = #message{topic = Topic}} -> + Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)), + Delivery#delivery.results; + {stop, _} -> + emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]), + [] + end. --spec(safe_publish(emqx_types:message()) -> ok). %% Called internally +-spec(safe_publish(emqx_types:message()) -> ok). safe_publish(Msg) when is_record(Msg, message) -> try publish(Msg) @@ -227,98 +207,113 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Topic), Delivery; - [Sub] -> %% optimize? - dispatch(Sub, Topic, Msg), + [SubPid] -> %% optimize? + dispatch(SubPid, Topic, Msg), Delivery#delivery{results = [{dispatch, Topic, 1}|Results]}; - Subscribers -> - Count = lists:foldl(fun(Sub, Acc) -> - dispatch(Sub, Topic, Msg), Acc + 1 - end, 0, Subscribers), + SubPids -> + Count = lists:foldl(fun(SubPid, Acc) -> + dispatch(SubPid, Topic, Msg), Acc + 1 + end, 0, SubPids), Delivery#delivery{results = [{dispatch, Topic, Count}|Results]} end. -dispatch({SubPid, _SubId}, Topic, Msg) when is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}; -dispatch({share, _Group, _Sub}, _Topic, _Msg) -> - ignored. +dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> + SubPid ! {dispatch, Topic, Msg}, + true; +%% TODO: how to optimize the share sub? +dispatch({share, _Group, _SubPid}, _Topic, _Msg) -> + false. inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; inc_dropped_cnt(_Topic) -> emqx_metrics:inc('messages/dropped'). --spec(subscribers(emqx_topic:topic()) -> [emqx_types:subscriber()]). +-spec(subscribers(emqx_topic:topic()) -> [pid()]). subscribers(Topic) -> - try ets:lookup_element(?SUBSCRIBER, Topic, 2) catch error:badarg -> [] end. + safe_lookup_element(?SUBSCRIBER, Topic, []). --spec(subscriptions(emqx_types:subscriber()) +%%------------------------------------------------------------------------------ +%% Subscriber is down +%%------------------------------------------------------------------------------ + +-spec(subscriber_down(pid()) -> true). +subscriber_down(SubPid) -> + lists:foreach( + fun(Sub = {_, Topic}) -> + case ets:lookup(?SUBOPTION, Sub) of + [{_, SubOpts}] -> + Group = maps:get(share, SubOpts, undefined), + true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), + true = ets:delete(?SUBOPTION, Sub), + gen_server:cast(pick(Topic), {unsubscribe, Group, Topic}); + [] -> ok + end + end, ets:lookup(?SUBSCRIPTION, SubPid)), + ets:delete(?SUBSCRIPTION, SubPid). + +%%------------------------------------------------------------------------------ +%% Management APIs +%%------------------------------------------------------------------------------ + +-spec(subscriptions(pid() | emqx_types:subid()) -> [{emqx_topic:topic(), emqx_types:subopts()}]). -subscriptions(Subscriber) -> - lists:map(fun({_, {share, _Group, Topic}}) -> - subscription(Topic, Subscriber); - ({_, Topic}) -> - subscription(Topic, Subscriber) - end, ets:lookup(?SUBSCRIPTION, Subscriber)). +subscriptions(SubPid) -> + [{Topic, safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{})} + || Topic <- safe_lookup_element(?SUBSCRIPTION, SubPid, [])]. -subscription(Topic, Subscriber) -> - {Topic, ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2)}. +-spec(subscribed(pid(), emqx_topic:topic()) -> boolean()). +subscribed(SubPid, Topic) when is_pid(SubPid) -> + ets:member(?SUBOPTION, {SubPid, Topic}); +subscribed(SubId, Topic) when ?is_subid(SubId) -> + %%FIXME:... SubId -> SubPid + ets:member(?SUBOPTION, {SubId, Topic}). --spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). -subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> - case ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1) of - {Match, _} -> - length(Match) >= 1; - '$end_of_table' -> - false - end; -subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> - case ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1) of - {Match, _} -> - length(Match) >= 1; - '$end_of_table' -> - false - end; -subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> - ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). +-spec(get_subopts(pid(), emqx_topic:topic()) -> emqx_types:subopts()). +get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) -> + safe_lookup_element(?SUBOPTION, {SubPid, Topic}, #{}). --spec(get_subopts(emqx_topic:topic(), emqx_types:subscriber()) -> emqx_types:subopts()). -get_subopts(Topic, Subscriber) when is_binary(Topic) -> - try ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) - catch error:badarg -> [] - end. - --spec(set_subopts(emqx_topic:topic(), emqx_types:subscriber(), emqx_types:subopts()) -> boolean()). -set_subopts(Topic, Subscriber, Opts) when is_binary(Topic), is_map(Opts) -> - case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of +-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()). +set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) -> + Sub = {self(), Topic}, + case ets:lookup(?SUBOPTION, Sub) of [{_, OldOpts}] -> - ets:insert(?SUBOPTION, {{Topic, Subscriber}, maps:merge(OldOpts, Opts)}); + ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)}); [] -> false end. -async_call(Broker, Req) -> - From = {self(), Tag = make_ref()}, - ok = gen_server:cast(Broker, {From, Req}), - Tag. +-spec(topics() -> [emqx_topic:topic()]). +topics() -> + emqx_router:topics(). -wait_for_replies(Tags, Timeout) -> - lists:foreach( - fun(Tag) -> - wait_for_reply(Tag, Timeout) - end, Tags). +safe_lookup_element(Tab, Key, Def) -> + try ets:lookup_element(Tab, Key, 2) catch error:badarg -> Def end. -wait_for_reply(Tag, Timeout) -> - receive - {Tag, Reply} -> Reply - after Timeout -> - exit(timeout) +%%------------------------------------------------------------------------------ +%% Stats fun +%%------------------------------------------------------------------------------ + +stats_fun() -> + safe_update_stats(?SUBSCRIBER, 'subscribers/count', 'subscribers/max'), + safe_update_stats(?SUBSCRIPTION, 'subscriptions/count', 'subscriptions/max'), + safe_update_stats(?SUBOPTION, 'suboptions/count', 'suboptions/max'). + +safe_update_stats(Tab, Stat, MaxStat) -> + case ets:info(Tab, size) of + undefined -> ok; + Size -> emqx_stats:setstat(Stat, MaxStat, Size) end. -%% Pick a broker -pick(SubPid) when is_pid(SubPid) -> - gproc_pool:pick_worker(broker, SubPid). +%%------------------------------------------------------------------------------ +%% Pick and call +%%------------------------------------------------------------------------------ --spec(topics() -> [emqx_topic:topic()]). -topics() -> emqx_router:topics(). +call(Broker, Req) -> + gen_server:call(Broker, Req, ?TIMEOUT). + +%% Pick a broker +pick(Topic) -> + gproc_pool:pick_worker(broker, Topic). %%------------------------------------------------------------------------------ %% gen_server callbacks @@ -326,61 +321,32 @@ topics() -> emqx_router:topics(). init([Pool, Id]) -> true = gproc_pool:connect_worker(Pool, {Pool, Id}), - {ok, #state{pool = Pool, id = Id, submap = #{}, submon = emqx_pmon:new()}}. + {ok, #{pool => Pool, id => Id}}. + +handle_call({subscribe, Group, Topic}, _From, State) -> + Ok = emqx_router:add_route(Topic, dest(Group)), + {reply, Ok, State}; + +handle_call({unsubscribe, Group, Topic}, _From, State) -> + Ok = case ets:member(?SUBSCRIBER, Topic) of + false -> emqx_router:delete_route(Topic, dest(Group)); + true -> ok + end, + {reply, Ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, subopts = SubOpts}}, State) -> - Subscriber = {SubPid, SubId}, - case ets:member(?SUBOPTION, {Topic, Subscriber}) of - false -> - resubscribe(From, {Subscriber, SubOpts, Topic}, State); - true -> - case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of - true -> - gen_server:reply(From, ok), - {noreply, State}; - false -> - resubscribe(From, {Subscriber, SubOpts, Topic}, State) - end - end; - -handle_cast({From, #unsubscribe{topic = Topic, subpid = SubPid, subid = SubId}}, State) -> - Subscriber = {SubPid, SubId}, - case ets:lookup(?SUBOPTION, {Topic, Subscriber}) of - [{_, SubOpts}] -> - Group = maps:get(share, SubOpts, undefined), - true = do_unsubscribe(Group, Topic, Subscriber), - emqx_shared_sub:unsubscribe(Group, Topic, SubPid), - case ets:member(?SUBSCRIBER, Topic) of - false -> emqx_router:del_route(From, Topic, dest(Group)); - true -> gen_server:reply(From, ok) - end; - [] -> gen_server:reply(From, ok) - end, - {noreply, State}; - handle_cast(Msg, State) -> emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #state{submap = SubMap}) -> - case maps:find(SubPid, SubMap) of - {ok, SubIds} -> - lists:foreach(fun(SubId) -> subscriber_down({SubPid, SubId}) end, SubIds), - {noreply, demonitor_subscriber(SubPid, State)}; - error -> - emqx_logger:error("unexpected 'DOWN': ~p, reason: ~p", [SubPid, Reason]), - {noreply, State} - end; - handle_info(Info, State) -> emqx_logger:error("[Broker] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id}) -> +terminate(_Reason, #{pool := Pool, id := Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> @@ -390,69 +356,9 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -resubscribe(From, {Subscriber, SubOpts, Topic}, State) -> - {SubPid, _} = Subscriber, - Group = maps:get(share, SubOpts, undefined), - true = do_subscribe(Group, Topic, Subscriber, SubOpts), - emqx_shared_sub:subscribe(Group, Topic, SubPid), - emqx_router:add_route(From, Topic, dest(Group)), - {noreply, monitor_subscriber(Subscriber, State)}. - -insert_subscriber(Group, Topic, Subscriber) -> - Subscribers = subscribers(Topic), - case lists:member(Subscriber, Subscribers) of - false -> - ets:insert(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}); - _ -> - ok - end. - -do_subscribe(Group, Topic, Subscriber, SubOpts) -> - ets:insert(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), - insert_subscriber(Group, Topic, Subscriber), - ets:insert(?SUBOPTION, {{Topic, Subscriber}, SubOpts}). - -do_unsubscribe(Group, Topic, Subscriber) -> - ets:delete_object(?SUBSCRIPTION, {Subscriber, shared(Group, Topic)}), - ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, Subscriber)}), - ets:delete(?SUBOPTION, {Topic, Subscriber}). - -subscriber_down(Subscriber) -> - Topics = lists:map(fun({_, {share, Group, Topic}}) -> - {Topic, Group}; - ({_, Topic}) -> - {Topic, undefined} - end, ets:lookup(?SUBSCRIPTION, Subscriber)), - lists:foreach(fun({Topic, undefined}) -> - true = do_unsubscribe(undefined, Topic, Subscriber), - ets:member(?SUBSCRIBER, Topic) orelse emqx_router:del_route(Topic, dest(undefined)); - ({Topic, Group}) -> - true = do_unsubscribe(Group, Topic, Subscriber), - Groups = groups(Topic), - case lists:member(Group, lists:usort(Groups)) of - true -> ok; - false -> emqx_router:del_route(Topic, dest(Group)) - end - end, Topics). - -monitor_subscriber({SubPid, SubId}, State = #state{submap = SubMap, submon = SubMon}) -> - UpFun = fun(SubIds) -> lists:usort([SubId|SubIds]) end, - State#state{submap = maps:update_with(SubPid, UpFun, [SubId], SubMap), - submon = emqx_pmon:monitor(SubPid, SubMon)}. - -demonitor_subscriber(SubPid, State = #state{submap = SubMap, submon = SubMon}) -> - State#state{submap = maps:remove(SubPid, SubMap), - submon = emqx_pmon:demonitor(SubPid, SubMon)}. - dest(undefined) -> node(); dest(Group) -> {Group, node()}. shared(undefined, Name) -> Name; shared(Group, Name) -> {share, Group, Name}. -groups(Topic) -> - lists:foldl(fun({_, {share, Group, _}}, Acc) -> - [Group | Acc]; - ({_, _}, Acc) -> - Acc - end, [], ets:lookup(?SUBSCRIBER, Topic)). diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index e597a233e..35fe06f0d 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -16,63 +16,82 @@ -behaviour(gen_server). --export([start_link/0]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-compile({no_auto_import, [monitor/2]}). -%% internal export --export([stats_fun/0]). +-export([start_link/0]). +-export([monitor/2]). +-export([create_seq/1, reclaim_seq/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). -define(HELPER, ?MODULE). +-define(SUBMON, emqx_submon). +-define(SUBSEQ, emqx_subseq). --record(state, {}). +-record(state, {pmon :: emqx_pmon:pmon()}). --spec(start_link() -> {ok, pid()} | ignore | {error, any()}). +-spec(start_link() -> emqx_types:startlink_ret()). start_link() -> gen_server:start_link({local, ?HELPER}, ?MODULE, [], []). +-spec(monitor(pid(), emqx_types:subid()) -> ok). +monitor(SubPid, SubId) when is_pid(SubPid) -> + case ets:lookup(?SUBMON, SubPid) of + [] -> + gen_server:cast(?HELPER, {monitor, SubPid, SubId}); + [{_, SubId}] -> + ok; + _Other -> + error(subid_conflict) + end. + +-spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). +create_seq(Topic) -> + emqx_sequence:nextval(?SUBSEQ, Topic). + +-spec(reclaim_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). +reclaim_seq(Topic) -> + emqx_sequence:reclaim(?SUBSEQ, Topic). + %%------------------------------------------------------------------------------ %% gen_server callbacks %%------------------------------------------------------------------------------ init([]) -> - %% Use M:F/A for callback, not anonymous function because - %% fun M:F/A is small, also no badfun risk during hot beam reload - emqx_stats:update_interval(broker_stats, fun ?MODULE:stats_fun/0), - {ok, #state{}, hibernate}. + %% SubSeq: Topic -> SeqId + _ = emqx_sequence:create(?SUBSEQ), + %% SubMon: SubPid -> SubId + _ = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), + %% Stats timer + emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), + {ok, #state{pmon = emqx_pmon:new()}, hibernate}. handle_call(Req, _From, State) -> emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast({monitor, SubPid, SubId}, State = #state{pmon = PMon}) -> + true = ets:insert(?SUBMON, {SubPid, SubId}), + {noreply, State#state{pmon = emqx_pmon:monitor(SubPid, PMon)}}; + handle_cast(Msg, State) -> emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]), {noreply, State}. +handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) -> + true = ets:delete(?SUBMON, SubPid), + ok = emqx_pool:async_submit(fun emqx_broker:subscriber_down/1, [SubPid]), + {noreply, State#state{pmon = emqx_pmon:erase(SubPid, PMon)}}; + handle_info(Info, State) -> emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{}) -> + _ = emqx_sequence:delete(?SUBSEQ), emqx_stats:cancel_update(broker_stats). code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%------------------------------------------------------------------------------ -%% Internal functions -%%------------------------------------------------------------------------------ - -stats_fun() -> - safe_update_stats(emqx_subscriber, - 'subscribers/count', 'subscribers/max'), - safe_update_stats(emqx_subscription, - 'subscriptions/count', 'subscriptions/max'), - safe_update_stats(emqx_suboptions, - 'suboptions/count', 'suboptions/max'). - -safe_update_stats(Tab, Stat, MaxStat) -> - case ets:info(Tab, size) of - undefined -> ok; - Size -> emqx_stats:setstat(Stat, MaxStat, Size) - end. - diff --git a/src/emqx_broker_sup.erl b/src/emqx_broker_sup.erl index 51f6e72aa..a511e4154 100644 --- a/src/emqx_broker_sup.erl +++ b/src/emqx_broker_sup.erl @@ -20,8 +20,6 @@ -export([init/1]). --define(TAB_OPTS, [public, {read_concurrency, true}, {write_concurrency, true}]). - start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). @@ -30,39 +28,26 @@ start_link() -> %%------------------------------------------------------------------------------ init([]) -> - %% Create the pubsub tables - ok = lists:foreach(fun create_tab/1, [subscription, subscriber, suboption]), - + %% Broker pool + PoolSize = emqx_vm:schedulers() * 2, + BrokerPool = emqx_pool_sup:spec(emqx_broker_pool, + [broker, hash, PoolSize, + {emqx_broker, start_link, []}]), %% Shared subscription - SharedSub = {shared_sub, {emqx_shared_sub, start_link, []}, - permanent, 5000, worker, [emqx_shared_sub]}, + SharedSub = #{id => shared_sub, + start => {emqx_shared_sub, start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [emqx_shared_sub]}, %% Broker helper - Helper = {broker_helper, {emqx_broker_helper, start_link, []}, - permanent, 5000, worker, [emqx_broker_helper]}, + Helper = #{id => helper, + start => {emqx_broker_helper, start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => [emqx_broker_helper]}, - %% Broker pool - BrokerPool = emqx_pool_sup:spec(emqx_broker_pool, - [broker, hash, emqx_vm:schedulers() * 2, - {emqx_broker, start_link, []}]), - - {ok, {{one_for_all, 0, 1}, [SharedSub, Helper, BrokerPool]}}. - -%%------------------------------------------------------------------------------ -%% Create tables -%%------------------------------------------------------------------------------ - -create_tab(suboption) -> - %% Suboption: {Topic, Sub} -> [{qos, 1}] - emqx_tables:new(emqx_suboption, [set | ?TAB_OPTS]); - -create_tab(subscriber) -> - %% Subscriber: Topic -> Sub1, Sub2, Sub3, ..., SubN - %% duplicate_bag: o(1) insert - emqx_tables:new(emqx_subscriber, [duplicate_bag | ?TAB_OPTS]); - -create_tab(subscription) -> - %% Subscription: Sub -> Topic1, Topic2, Topic3, ..., TopicN - %% bag: o(n) insert - emqx_tables:new(emqx_subscription, [bag | ?TAB_OPTS]). + {ok, {{one_for_all, 0, 1}, [BrokerPool, SharedSub, Helper]}}. diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 7c4e7cea1..df2dda686 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -61,7 +61,7 @@ init([Pool, Id, Node, Topic, Options]) -> true -> true = erlang:monitor_node(Node, true), Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), - emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}), + emqx_broker:subscribe(Topic, #{share => Group, qos => ?QOS_0}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len, store_qos0 => true}), diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl index 62a882294..c4fc0fd08 100644 --- a/src/emqx_sequence.erl +++ b/src/emqx_sequence.erl @@ -14,45 +14,47 @@ -module(emqx_sequence). --export([create/0, create/1]). --export([generate/1, generate/2]). --export([reclaim/1, reclaim/2]). +-export([create/1, nextval/2, currval/2, reclaim/2, delete/1]). -type(key() :: term()). +-type(name() :: atom()). -type(seqid() :: non_neg_integer()). --define(DEFAULT_TAB, ?MODULE). +-export_type([seqid/0]). %% @doc Create a sequence. --spec(create() -> ok). -create() -> - create(?DEFAULT_TAB). - --spec(create(atom()) -> ok). -create(Tab) -> - _ = ets:new(Tab, [set, public, named_table, {write_concurrency, true}]), +-spec(create(name()) -> ok). +create(Name) -> + _ = ets:new(Name, [set, public, named_table, {write_concurrency, true}]), ok. -%% @doc Generate a sequence id. --spec(generate(key()) -> seqid()). -generate(Key) -> - generate(?DEFAULT_TAB, Key). +%% @doc Next value of the sequence. +-spec(nextval(name(), key()) -> seqid()). +nextval(Name, Key) -> + ets:update_counter(Name, Key, {2, 1}, {Key, 0}). --spec(generate(atom(), key()) -> seqid()). -generate(Tab, Key) -> - ets:update_counter(Tab, Key, {2, 1}, {Key, 0}). +%% @doc Current value of the sequence. +-spec(currval(name(), key()) -> seqid()). +currval(Name, Key) -> + try ets:lookup_element(Name, Key, 2) + catch + error:badarg -> 0 + end. %% @doc Reclaim a sequence id. --spec(reclaim(key()) -> seqid()). -reclaim(Key) -> - reclaim(?DEFAULT_TAB, Key). - --spec(reclaim(atom(), key()) -> seqid()). -reclaim(Tab, Key) -> - try ets:update_counter(Tab, Key, {2, -1, 0, 0}) of - 0 -> ets:delete_object(Tab, {Key, 0}), 0; +-spec(reclaim(name(), key()) -> seqid()). +reclaim(Name, Key) -> + try ets:update_counter(Name, Key, {2, -1, 0, 0}) of + 0 -> ets:delete_object(Name, {Key, 0}), 0; I -> I catch error:badarg -> 0 end. +%% @doc Delete the sequence. +delete(Name) -> + case ets:info(Name, name) of + Name -> ets:delete(Name); + undefined -> false + end. + diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 12718b9fc..262a9a7a8 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -465,7 +465,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}}, emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), SubMap; {ok, _SubOpts} -> - emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), + emqx_broker:set_subopts(Topic, SubOpts), %% Why??? emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]), maps:put(Topic, SubOpts, SubMap); diff --git a/test/emqx_sequence_SUITE.erl b/test/emqx_sequence_SUITE.erl index 999a95723..f37b60d76 100644 --- a/test/emqx_sequence_SUITE.erl +++ b/test/emqx_sequence_SUITE.erl @@ -19,19 +19,19 @@ -include_lib("eunit/include/eunit.hrl"). --import(emqx_sequence, [generate/1, reclaim/1]). +-import(emqx_sequence, [nextval/2, reclaim/2]). all() -> [sequence_generate]. sequence_generate(_) -> - ok = emqx_sequence:create(), - ?assertEqual(1, generate(key)), - ?assertEqual(2, generate(key)), - ?assertEqual(3, generate(key)), - ?assertEqual(2, reclaim(key)), - ?assertEqual(1, reclaim(key)), - ?assertEqual(0, reclaim(key)), - ?assertEqual(false, ets:member(emqx_sequence, key)), - ?assertEqual(1, generate(key)). + ok = emqx_sequence:create(seqtab), + ?assertEqual(1, nextval(seqtab, key)), + ?assertEqual(2, nextval(seqtab, key)), + ?assertEqual(3, nextval(seqtab, key)), + ?assertEqual(2, reclaim(seqtab, key)), + ?assertEqual(1, reclaim(seqtab, key)), + ?assertEqual(0, reclaim(seqtab, key)), + ?assertEqual(false, ets:member(seqtab, key)), + ?assertEqual(1, nextval(seqtab, key)). From 36e7d63d66b47d9c6ba6f4de8d33a60b4ad0be31 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 7 Dec 2018 18:20:09 +0800 Subject: [PATCH 3/7] Implement subscription sharding. 1. Improve the design router, broker and shared_sub 2. New ets tables' design for subscription sharding --- src/emqx_access_control.erl | 2 +- src/emqx_acl_internal.erl | 2 +- src/emqx_broker.erl | 149 ++++++++++++++++++------------ src/emqx_broker_helper.erl | 14 ++- src/emqx_cm.erl | 6 +- src/emqx_ctl.erl | 2 +- src/emqx_hooks.erl | 2 +- src/emqx_metrics.erl | 2 +- src/emqx_router.erl | 175 +++++++++++++----------------------- src/emqx_sequence.erl | 3 +- src/emqx_shared_sub.erl | 63 ++++++++----- src/emqx_sm.erl | 8 +- src/emqx_stats.erl | 2 +- src/emqx_tables.erl | 6 +- src/emqx_zone.erl | 2 +- 15 files changed, 227 insertions(+), 211 deletions(-) diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 1b9d76937..06ea86633 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -148,7 +148,7 @@ stop() -> %%----------------------------------------------------------------------------- init([]) -> - _ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]), {ok, #{}}. handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> diff --git a/src/emqx_acl_internal.erl b/src/emqx_acl_internal.erl index eee7e6c18..f8b995096 100644 --- a/src/emqx_acl_internal.erl +++ b/src/emqx_acl_internal.erl @@ -45,7 +45,7 @@ all_rules() -> -spec(init([File :: string()]) -> {ok, #{}}). init([File]) -> - _ = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]), + ok = emqx_tables:new(?ACL_RULE_TAB, [set, public, {read_concurrency, true}]), true = load_rules_from_file(File), {ok, #{acl_file => File}}. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 3a3cde1fa..0a9264489 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -23,7 +23,7 @@ -export([unsubscribe/1, unsubscribe/2]). -export([subscriber_down/1]). -export([publish/1, safe_publish/1]). --export([dispatch/2, dispatch/3]). +-export([dispatch/2]). -export([subscriptions/1, subscribers/1, subscribed/2]). -export([get_subopts/2, set_subopts/2]). -export([topics/0]). @@ -34,8 +34,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --define(SHARD, 1024). --define(TIMEOUT, 60000). -define(BROKER, ?MODULE). %% ETS tables @@ -44,33 +42,36 @@ -define(SUBSCRIBER, emqx_subscriber). -define(SUBSCRIPTION, emqx_subscription). -%% Gards +%% Guards -define(is_subid(Id), (is_binary(Id) orelse is_atom(Id))). -spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> _ = create_tabs(), - gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)}, ?MODULE, [Pool, Id], []). + Name = emqx_misc:proc_name(?BROKER, Id), + gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], []). %%------------------------------------------------------------------------------ %% Create tabs %%------------------------------------------------------------------------------ +-spec(create_tabs() -> ok). create_tabs() -> TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}], - %% SubId: SubId -> SubPid1, SubPid2,... - _ = emqx_tables:new(?SUBID, [bag | TabOpts]), + %% SubId: SubId -> SubPid + ok = emqx_tables:new(?SUBID, [set | TabOpts]), + %% SubOption: {SubPid, Topic} -> SubOption - _ = emqx_tables:new(?SUBOPTION, [set | TabOpts]), + ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]), %% Subscription: SubPid -> Topic1, Topic2, Topic3, ... %% duplicate_bag: o(1) insert - _ = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), + ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]), %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... %% duplicate_bag: o(1) insert - emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). + ok = emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). %%------------------------------------------------------------------------------ %% Subscribe API @@ -92,14 +93,23 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map case ets:member(?SUBOPTION, {SubPid, Topic}) of false -> ok = emqx_broker_helper:monitor(SubPid, SubId), - Group = maps:get(share, SubOpts, undefined), %% true = ets:insert(?SUBID, {SubId, SubPid}), true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), - %% SeqId = emqx_broker_helper:create_seq(Topic), - true = ets:insert(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), - true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - ok = emqx_shared_sub:subscribe(Group, Topic, SubPid), - call(pick(Topic), {subscribe, Group, Topic}); + case maps:get(share, SubOpts, undefined) of + undefined -> + Shard = emqx_broker_helper:get_shard(SubPid, Topic), + case Shard of + 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); + I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) + end, + SubOpts1 = maps:put(shard, Shard, SubOpts), + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), + call(pick({Topic, Shard}), {subscribe, Topic}); + Group -> %% Shared subscription + true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), + emqx_shared_sub:subscribe(Group, Topic, SubPid) + end; true -> ok end. @@ -112,12 +122,21 @@ unsubscribe(Topic) when is_binary(Topic) -> SubPid = self(), case ets:lookup(?SUBOPTION, {SubPid, Topic}) of [{_, SubOpts}] -> - Group = maps:get(share, SubOpts, undefined), + _ = emqx_broker_helper:reclaim_seq(Topic), + case maps:get(share, SubOpts, undefined) of + undefined -> + case maps:get(shared, SubOpts, 0) of + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; + Group -> + ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid) + end, true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), - true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), - true = ets:delete(?SUBOPTION, {SubPid, Topic}), - ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid), - call(pick(Topic), {unsubscribe, Group, Topic}); + %%true = ets:delete_object(?SUBID, {SubId, SubPid}), + true = ets:delete(?SUBOPTION, {SubPid, Topic}); [] -> ok end. @@ -207,22 +226,23 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> emqx_hooks:run('message.dropped', [#{node => node()}, Msg]), inc_dropped_cnt(Topic), Delivery; - [SubPid] -> %% optimize? - dispatch(SubPid, Topic, Msg), + [Sub] -> %% optimize? + dispatch(Sub, Topic, Msg), Delivery#delivery{results = [{dispatch, Topic, 1}|Results]}; - SubPids -> - Count = lists:foldl(fun(SubPid, Acc) -> - dispatch(SubPid, Topic, Msg), Acc + 1 - end, 0, SubPids), + Subs -> + Count = lists:foldl( + fun(Sub, Acc) -> + dispatch(Sub, Topic, Msg), Acc + 1 + end, 0, Subs), Delivery#delivery{results = [{dispatch, Topic, Count}|Results]} end. dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> - SubPid ! {dispatch, Topic, Msg}, - true; -%% TODO: how to optimize the share sub? -dispatch({share, _Group, _SubPid}, _Topic, _Msg) -> - false. + SubPid ! {dispatch, Topic, Msg}; +dispatch({shard, I}, Topic, Msg) -> + lists:foreach(fun(SubPid) -> + SubPid ! {dispatch, Topic, Msg} + end, safe_lookup_element(?SUBSCRIBER, {share, Topic, I}, [])). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -240,17 +260,20 @@ subscribers(Topic) -> -spec(subscriber_down(pid()) -> true). subscriber_down(SubPid) -> lists:foreach( - fun(Sub = {_, Topic}) -> + fun(Sub = {_Pid, Topic}) -> case ets:lookup(?SUBOPTION, Sub) of [{_, SubOpts}] -> - Group = maps:get(share, SubOpts, undefined), - true = ets:delete_object(?SUBSCRIBER, {Topic, shared(Group, SubPid)}), - true = ets:delete(?SUBOPTION, Sub), - gen_server:cast(pick(Topic), {unsubscribe, Group, Topic}); + _ = emqx_broker_helper:reclaim_seq(Topic), + case maps:get(shared, SubOpts, 0) of + 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + end; [] -> ok end end, ets:lookup(?SUBSCRIPTION, SubPid)), - ets:delete(?SUBSCRIPTION, SubPid). + true = ets:delete(?SUBSCRIPTION, SubPid). %%------------------------------------------------------------------------------ %% Management APIs @@ -305,11 +328,14 @@ safe_update_stats(Tab, Stat, MaxStat) -> end. %%------------------------------------------------------------------------------ -%% Pick and call +%% call, cast, pick %%------------------------------------------------------------------------------ call(Broker, Req) -> - gen_server:call(Broker, Req, ?TIMEOUT). + gen_server:call(Broker, Req). + +cast(Broker, Msg) -> + gen_server:cast(Broker, Msg). %% Pick a broker pick(Topic) -> @@ -320,24 +346,41 @@ pick(Topic) -> %%------------------------------------------------------------------------------ init([Pool, Id]) -> + _ = emqx_router:set_mode(protected), true = gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #{pool => Pool, id => Id}}. -handle_call({subscribe, Group, Topic}, _From, State) -> - Ok = emqx_router:add_route(Topic, dest(Group)), - {reply, Ok, State}; - -handle_call({unsubscribe, Group, Topic}, _From, State) -> - Ok = case ets:member(?SUBSCRIBER, Topic) of - false -> emqx_router:delete_route(Topic, dest(Group)); - true -> ok - end, - {reply, Ok, State}; +handle_call({subscribe, Topic}, _From, State) -> + case get(Topic) of + undefined -> + _ = put(Topic, true), + emqx_router:add_route(Topic); + true -> ok + end, + {reply, ok, State}; handle_call(Req, _From, State) -> emqx_logger:error("[Broker] unexpected call: ~p", [Req]), {reply, ignored, State}. +handle_cast({unsubscribed, Topic}, State) -> + case ets:member(?SUBSCRIBER, Topic) of + false -> + _ = erase(Topic), + emqx_router:delete_route(Topic); + true -> ok + end, + {noreply, State}; + +handle_cast({unsubscribed, Topic, I}, State) -> + case ets:member(?SUBSCRIBER, {shard, Topic, I}) of + false -> + true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + cast(pick(Topic), {unsubscribed, Topic}); + true -> ok + end, + {noreply, State}; + handle_cast(Msg, State) -> emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -356,9 +399,3 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -dest(undefined) -> node(); -dest(Group) -> {Group, node()}. - -shared(undefined, Name) -> Name; -shared(Group, Name) -> {share, Group, Name}. - diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 35fe06f0d..d3e7f9d37 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,6 +20,7 @@ -export([start_link/0]). -export([monitor/2]). +-export([get_shard/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -46,6 +47,13 @@ monitor(SubPid, SubId) when is_pid(SubPid) -> error(subid_conflict) end. +-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_shard(SubPid, Topic) -> + case create_seq(Topic) of + Seq when Seq =< 1024 -> 0; + _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) + end. + -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). create_seq(Topic) -> emqx_sequence:nextval(?SUBSEQ, Topic). @@ -60,9 +68,11 @@ reclaim_seq(Topic) -> init([]) -> %% SubSeq: Topic -> SeqId - _ = emqx_sequence:create(?SUBSEQ), + ok = emqx_sequence:create(?SUBSEQ), + %% Shards: CPU * 32 + true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), %% SubMon: SubPid -> SubId - _ = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), + ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), %% Stats timer emqx_stats:update_interval(broker_stats, fun emqx_broker:stats_fun/0), {ok, #state{pmon = emqx_pmon:new()}, hibernate}. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index 19892b386..6756cf02b 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -125,9 +125,9 @@ notify(Msg) -> init([]) -> TabOpts = [public, set, {write_concurrency, true}], - _ = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), - _ = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), - _ = emqx_tables:new(?CONN_STATS_TAB, TabOpts), + ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]), + ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts), + ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts), ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0), {ok, #{conn_pmon => emqx_pmon:new()}}. diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 17166a014..c00556eb7 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -96,7 +96,7 @@ usage() -> %%------------------------------------------------------------------------------ init([]) -> - _ = emqx_tables:new(?TAB, [ordered_set, protected]), + ok = emqx_tables:new(?TAB, [protected, ordered_set]), {ok, #state{seq = 0}}. handle_call(Req, _From, State) -> diff --git a/src/emqx_hooks.erl b/src/emqx_hooks.erl index 073c12870..b2eb0d6f4 100644 --- a/src/emqx_hooks.erl +++ b/src/emqx_hooks.erl @@ -139,7 +139,7 @@ lookup(HookPoint) -> %%------------------------------------------------------------------------------ init([]) -> - _ = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]), {ok, #{}}. handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, State) -> diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index caf862146..b4b3a1307 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -285,7 +285,7 @@ qos_sent(?QOS_2) -> init([]) -> % Create metrics table - _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), + ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]), lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), {ok, #{}, hibernate}. diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 941c004f7..313adc475 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -28,23 +28,22 @@ -export([start_link/2]). %% Route APIs --export([add_route/1, add_route/2, add_route/3]). +-export([add_route/1, add_route/2]). -export([get_routes/1]). --export([del_route/1, del_route/2, del_route/3]). +-export([delete_route/1, delete_route/2]). -export([has_routes/1, match_routes/1, print_routes/1]). -export([topics/0]). + +%% Mode +-export([set_mode/1, get_mode/0]). + %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -type(destination() :: node() | {binary(), node()}). --record(batch, {enabled, timer, pending}). --record(state, {pool, id, batch :: #batch{}}). - -define(ROUTE, emqx_route). --define(BATCH(Enabled), #batch{enabled = Enabled}). --define(BATCH(Enabled, Pending), #batch{enabled = Enabled, pending = Pending}). %%------------------------------------------------------------------------------ %% Mnesia bootstrap @@ -62,49 +61,66 @@ mnesia(copy) -> ok = ekka_mnesia:copy_table(?ROUTE). %%------------------------------------------------------------------------------ -%% Strat a router +%% Start a router %%------------------------------------------------------------------------------ --spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). +-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()). start_link(Pool, Id) -> - gen_server:start_link({local, emqx_misc:proc_name(?MODULE, Id)}, - ?MODULE, [Pool, Id], [{hibernate_after, 2000}]). + Name = emqx_misc:proc_name(?MODULE, Id), + gen_server:start_link({local, Name}, ?MODULE, [Pool, Id], [{hibernate_after, 1000}]). %%------------------------------------------------------------------------------ %% Route APIs %%------------------------------------------------------------------------------ --spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok). +-spec(add_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). add_route(Topic) when is_binary(Topic) -> add_route(#route{topic = Topic, dest = node()}); add_route(Route = #route{topic = Topic}) -> - cast(pick(Topic), {add_route, Route}). + case get_mode() of + protected -> do_add_route(Route); + undefined -> call(pick(Topic), {add_route, Route}) + end. --spec(add_route(emqx_topic:topic(), destination()) -> ok). +-spec(add_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). add_route(Topic, Dest) when is_binary(Topic) -> add_route(#route{topic = Topic, dest = Dest}). --spec(add_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok). -add_route(From, Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {add_route, From, #route{topic = Topic, dest = Dest}}). +%% @private +do_add_route(Route = #route{topic = Topic, dest = Dest}) -> + case lists:member(Route, get_routes(Topic)) of + true -> ok; + false -> + ok = emqx_router_helper:monitor(Dest), + case emqx_topic:wildcard(Topic) of + true -> trans(fun add_trie_route/1, [Route]); + false -> add_direct_route(Route) + end + end. -spec(get_routes(emqx_topic:topic()) -> [emqx_types:route()]). get_routes(Topic) -> ets:lookup(?ROUTE, Topic). --spec(del_route(emqx_topic:topic() | emqx_types:route()) -> ok). -del_route(Topic) when is_binary(Topic) -> - del_route(#route{topic = Topic, dest = node()}); -del_route(Route = #route{topic = Topic}) -> - cast(pick(Topic), {del_route, Route}). +-spec(delete_route(emqx_topic:topic() | emqx_types:route()) -> ok | {error, term()}). +delete_route(Topic) when is_binary(Topic) -> + delete_route(#route{topic = Topic, dest = node()}); +delete_route(Route = #route{topic = Topic}) -> + case get_mode() of + protected -> do_delete_route(Route); + undefined -> call(pick(Topic), {delete_route, Route}) + end. --spec(del_route(emqx_topic:topic(), destination()) -> ok). -del_route(Topic, Dest) when is_binary(Topic) -> - del_route(#route{topic = Topic, dest = Dest}). +-spec(delete_route(emqx_topic:topic(), destination()) -> ok | {error, term()}). +delete_route(Topic, Dest) when is_binary(Topic) -> + delete_route(#route{topic = Topic, dest = Dest}). --spec(del_route({pid(), reference()}, emqx_topic:topic(), destination()) -> ok). -del_route(From, Topic, Dest) when is_binary(Topic) -> - cast(pick(Topic), {del_route, From, #route{topic = Topic, dest = Dest}}). +%% @private +do_delete_route(Route = #route{topic = Topic}) -> + case emqx_topic:wildcard(Topic) of + true -> trans(fun del_trie_route/1, [Route]); + false -> del_direct_route(Route) + end. -spec(has_routes(emqx_topic:topic()) -> boolean()). has_routes(Topic) when is_binary(Topic) -> @@ -127,8 +143,15 @@ print_routes(Topic) -> io:format("~s -> ~s~n", [To, Dest]) end, match_routes(Topic)). -cast(Router, Msg) -> - gen_server:cast(Router, Msg). +-spec(set_mode(protected | atom()) -> any()). +set_mode(Mode) when is_atom(Mode) -> + put('$router_mode', Mode). + +-spec(get_mode() -> protected | undefined | atom()). +get_mode() -> get('$router_mode'). + +call(Router, Msg) -> + gen_server:call(Router, Msg, infinity). pick(Topic) -> gproc_pool:pick_worker(router, Topic). @@ -138,71 +161,28 @@ pick(Topic) -> %%------------------------------------------------------------------------------ init([Pool, Id]) -> - rand:seed(exsplus, erlang:timestamp()), - gproc_pool:connect_worker(Pool, {Pool, Id}), - Batch = #batch{enabled = emqx_config:get_env(route_batch_clean, false), - pending = sets:new()}, - {ok, ensure_batch_timer(#state{pool = Pool, id = Id, batch = Batch})}. + true = gproc_pool:connect_worker(Pool, {Pool, Id}), + {ok, #{pool => Pool, id => Id}}. + +handle_call({add_route, Route}, _From, State) -> + {reply, do_add_route(Route), State}; + +handle_call({delete_route, Route}, _From, State) -> + {reply, do_delete_route(Route), State}; handle_call(Req, _From, State) -> emqx_logger:error("[Router] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({add_route, From, Route}, State) -> - {noreply, NewState} = handle_cast({add_route, Route}, State), - _ = gen_server:reply(From, ok), - {noreply, NewState}; - -handle_cast({add_route, Route = #route{topic = Topic, dest = Dest}}, State) -> - case lists:member(Route, get_routes(Topic)) of - true -> ok; - false -> - ok = emqx_router_helper:monitor(Dest), - case emqx_topic:wildcard(Topic) of - true -> log(trans(fun add_trie_route/1, [Route])); - false -> add_direct_route(Route) - end - end, - {noreply, State}; - -handle_cast({del_route, From, Route}, State) -> - {noreply, NewState} = handle_cast({del_route, Route}, State), - _ = gen_server:reply(From, ok), - {noreply, NewState}; - -handle_cast({del_route, Route = #route{topic = Topic, dest = Dest}}, State) when is_tuple(Dest) -> - {noreply, case emqx_topic:wildcard(Topic) of - true -> log(trans(fun del_trie_route/1, [Route])), - State; - false -> del_direct_route(Route, State) - end}; - -handle_cast({del_route, Route = #route{topic = Topic}}, State) -> - %% Confirm if there are still subscribers... - {noreply, case ets:member(emqx_subscriber, Topic) of - true -> State; - false -> - case emqx_topic:wildcard(Topic) of - true -> log(trans(fun del_trie_route/1, [Route])), - State; - false -> del_direct_route(Route, State) - end - end}; - handle_cast(Msg, State) -> emqx_logger:error("[Router] unexpected cast: ~p", [Msg]), {noreply, State}. -handle_info({timeout, _TRef, batch_delete}, State = #state{batch = Batch}) -> - _ = del_direct_routes(sets:to_list(Batch#batch.pending)), - {noreply, ensure_batch_timer(State#state{batch = ?BATCH(true, sets:new())}), hibernate}; - handle_info(Info, State) -> emqx_logger:error("[Router] unexpected info: ~p", [Info]), {noreply, State}. -terminate(_Reason, #state{pool = Pool, id = Id, batch = Batch}) -> - _ = cacel_batch_timer(Batch), +terminate(_Reason, #{pool := Pool, id := Id}) -> gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> @@ -212,17 +192,6 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%------------------------------------------------------------------------------ -ensure_batch_timer(State = #state{batch = #batch{enabled = false}}) -> - State; -ensure_batch_timer(State = #state{batch = Batch}) -> - TRef = erlang:start_timer(50 + rand:uniform(50), self(), batch_delete), - State#state{batch = Batch#batch{timer = TRef}}. - -cacel_batch_timer(#batch{enabled = false}) -> - ok; -cacel_batch_timer(#batch{enabled = true, timer = TRef}) -> - catch erlang:cancel_timer(TRef). - add_direct_route(Route) -> mnesia:async_dirty(fun mnesia:write/3, [?ROUTE, Route, sticky_write]). @@ -233,25 +202,9 @@ add_trie_route(Route = #route{topic = Topic}) -> end, mnesia:write(?ROUTE, Route, sticky_write). -del_direct_route(Route, State = #state{batch = ?BATCH(false)}) -> - del_direct_route(Route), State; -del_direct_route(Route, State = #state{batch = Batch = ?BATCH(true, Pending)}) -> - State#state{batch = Batch#batch{pending = sets:add_element(Route, Pending)}}. - del_direct_route(Route) -> mnesia:async_dirty(fun mnesia:delete_object/3, [?ROUTE, Route, sticky_write]). -del_direct_routes([]) -> - ok; -del_direct_routes(Routes) -> - DelFun = fun(R = #route{topic = Topic}) -> - case ets:member(emqx_subscriber, Topic) of - true -> ok; - false -> mnesia:delete_object(?ROUTE, R, sticky_write) - end - end, - mnesia:async_dirty(fun lists:foreach/2, [DelFun, Routes]). - del_trie_route(Route = #route{topic = Topic}) -> case mnesia:wread({?ROUTE, Topic}) of [Route] -> %% Remove route and trie @@ -270,7 +223,3 @@ trans(Fun, Args) -> {aborted, Error} -> {error, Error} end. -log(ok) -> ok; -log({error, Reason}) -> - emqx_logger:error("[Router] mnesia aborted: ~p", [Reason]). - diff --git a/src/emqx_sequence.erl b/src/emqx_sequence.erl index c4fc0fd08..022531df5 100644 --- a/src/emqx_sequence.erl +++ b/src/emqx_sequence.erl @@ -25,8 +25,7 @@ %% @doc Create a sequence. -spec(create(name()) -> ok). create(Name) -> - _ = ets:new(Name, [set, public, named_table, {write_concurrency, true}]), - ok. + emqx_tables:new(Name, [public, set, {write_concurrency, true}]). %% @doc Next value of the sequence. -spec(nextval(name(), key()) -> seqid()). diff --git a/src/emqx_shared_sub.erl b/src/emqx_shared_sub.erl index ebe6d51f8..d1d0d921d 100644 --- a/src/emqx_shared_sub.erl +++ b/src/emqx_shared_sub.erl @@ -17,6 +17,7 @@ -behaviour(gen_server). -include("emqx.hrl"). +-include("emqx_mqtt.hrl"). %% Mnesia bootstrap -export([mnesia/1]). @@ -27,7 +28,8 @@ -export([start_link/0]). -export([subscribe/3, unsubscribe/3]). --export([dispatch/3, maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]). +-export([dispatch/3]). +-export([maybe_ack/1, maybe_nack_dropped/1, nack_no_connection/1, is_ack_required/1]). %% for testing -export([subscribers/2]). @@ -38,6 +40,7 @@ -define(SERVER, ?MODULE). -define(TAB, emqx_shared_subscription). +-define(SHARED_SUBS, emqx_shared_subscriber). -define(ALIVE_SUBS, emqx_alive_shared_subscribers). -define(SHARED_SUB_QOS1_DISPATCH_TIMEOUT_SECONDS, 5). -define(ack, shared_sub_ack). @@ -48,8 +51,6 @@ -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). --include("emqx_mqtt.hrl"). - %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -72,16 +73,11 @@ mnesia(copy) -> start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -subscribe(undefined, _Topic, _SubPid) -> - ok; subscribe(Group, Topic, SubPid) when is_pid(SubPid) -> - mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), - gen_server:cast(?SERVER, {monitor, SubPid}). + gen_server:call(?SERVER, {subscribe, Group, Topic, SubPid}). -unsubscribe(undefined, _Topic, _SubPid) -> - ok; unsubscribe(Group, Topic, SubPid) when is_pid(SubPid) -> - mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)). + gen_server:call(?SERVER, {unsubscribe, Group, Topic, SubPid}). record(Group, Topic, SubPid) -> #emqx_shared_subscription{group = Group, topic = Topic, subpid = SubPid}. @@ -251,14 +247,16 @@ do_pick_subscriber(Group, Topic, round_robin, _ClientId, Count) -> subscribers(Group, Topic) -> ets:select(?TAB, [{{emqx_shared_subscription, Group, Topic, '$1'}, [], ['$1']}]). -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%----------------------------------------------------------------------------- +%%------------------------------------------------------------------------------ init([]) -> - {atomic, PMon} = mnesia:transaction(fun init_monitors/0), + _ = emqx_router:set_mode(protected), mnesia:subscribe({table, ?TAB, simple}), - ets:new(?ALIVE_SUBS, [named_table, {read_concurrency, true}, protected]), + {atomic, PMon} = mnesia:transaction(fun init_monitors/0), + ok = emqx_tables:new(?SHARED_SUBS, [protected, bag]), + ok = emqx_tables:new(?ALIVE_SUBS, [protected, set, {read_concurrency, true}]), {ok, update_stats(#state{pmon = PMon})}. init_monitors() -> @@ -267,14 +265,29 @@ init_monitors() -> emqx_pmon:monitor(SubPid, Mon) end, emqx_pmon:new(), ?TAB). +handle_call({subscribe, Group, Topic, SubPid}, _From, State = #state{pmon = PMon}) -> + mnesia:dirty_write(?TAB, record(Group, Topic, SubPid)), + case ets:member(?SHARED_SUBS, {Group, Topic}) of + true -> ok; + false -> ok = emqx_router:add_route(Topic, {Group, node()}) + end, + ok = maybe_insert_alive_tab(SubPid), + true = ets:insert(?SHARED_SUBS, {{Group, Topic}, SubPid}), + {reply, ok, update_stats(State#state{pmon = emqx_pmon:monitor(SubPid, PMon)})}; + +handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) -> + mnesia:dirty_delete_object(?TAB, record(Group, Topic, SubPid)), + true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + case ets:member(?SHARED_SUBS, {Group, Topic}) of + true -> ok; + false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + end, + {reply, ok, State}; + handle_call(Req, _From, State) -> emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]), {reply, ignored, State}. -handle_cast({monitor, SubPid}, State= #state{pmon = PMon}) -> - NewPmon = emqx_pmon:monitor(SubPid, PMon), - ok = maybe_insert_alive_tab(SubPid), - {noreply, update_stats(State#state{pmon = NewPmon})}; handle_cast(Msg, State) -> emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]), {noreply, State}. @@ -316,12 +329,18 @@ maybe_insert_alive_tab(Pid) when is_pid(Pid) -> ets:insert(?ALIVE_SUBS, {Pid}), cleanup_down(SubPid) -> ?IS_LOCAL_PID(SubPid) orelse ets:delete(?ALIVE_SUBS, SubPid), lists:foreach( - fun(Record) -> - mnesia:dirty_delete_object(?TAB, Record) - end,mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). + fun(Record = #emqx_shared_subscription{topic = Topic, group = Group}) -> + ok = mnesia:dirty_delete_object(?TAB, Record), + true = ets:delete_object(?SHARED_SUBS, {{Group, Topic}, SubPid}), + case ets:member(?SHARED_SUBS, {Group, Topic}) of + true -> ok; + false -> ok = emqx_router:delete_route(Topic, {Group, node()}) + end + end, mnesia:dirty_match_object(#emqx_shared_subscription{_ = '_', subpid = SubPid})). update_stats(State) -> - emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), State. + emqx_stats:setstat('subscriptions/shared/count', 'subscriptions/shared/max', ets:info(?TAB, size)), + State. %% Return 'true' if the subscriber process is alive AND not in the failed list is_active_sub(Pid, FailedSubs) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 8f9a3e3cb..d178a8ae7 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -202,10 +202,10 @@ notify(Event) -> init([]) -> TabOpts = [public, set, {write_concurrency, true}], - _ = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]), - _ = emqx_tables:new(?SESSION_P_TAB, TabOpts), - _ = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), - _ = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), + ok = emqx_tables:new(?SESSION_TAB, [{read_concurrency, true} | TabOpts]), + ok = emqx_tables:new(?SESSION_P_TAB, TabOpts), + ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts), + ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts), emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0), {ok, #{session_pmon => emqx_pmon:new()}}. diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 61ff6cbc3..790c397b9 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -152,7 +152,7 @@ cast(Msg) -> %%------------------------------------------------------------------------------ init(#{tick_ms := TickMs}) -> - _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), + ok = emqx_tables:new(?TAB, [public, set, {write_concurrency, true}]), Stats = lists:append([?CONNECTION_STATS, ?SESSION_STATS, ?PUBSUB_STATS, ?ROUTE_STATS, ?RETAINED_STATS]), true = ets:insert(?TAB, [{Name, 0} || Name <- Stats]), diff --git a/src/emqx_tables.erl b/src/emqx_tables.erl index 330c87d9c..9b3ebfeae 100644 --- a/src/emqx_tables.erl +++ b/src/emqx_tables.erl @@ -17,10 +17,12 @@ -export([new/2]). %% Create a named_table ets. +-spec(new(atom(), list()) -> ok). new(Tab, Opts) -> case ets:info(Tab, name) of undefined -> - ets:new(Tab, lists:usort([named_table | Opts])); - Tab -> Tab + _ = ets:new(Tab, lists:usort([named_table | Opts])), + ok; + Tab -> ok end. diff --git a/src/emqx_zone.erl b/src/emqx_zone.erl index dd183dbdf..d119abe52 100644 --- a/src/emqx_zone.erl +++ b/src/emqx_zone.erl @@ -68,7 +68,7 @@ stop() -> %%------------------------------------------------------------------------------ init([]) -> - _ = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), + ok = emqx_tables:new(?TAB, [set, {read_concurrency, true}]), {ok, element(2, handle_info(reload, #{timer => undefined}))}. handle_call(force_reload, _From, State) -> From 5e53eaeee5b65d2d1ec0c0721d5cdddc311527f4 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 09:56:00 +0800 Subject: [PATCH 4/7] rename shard shared --- src/emqx_broker.erl | 29 ++++++++++++++++------------- src/emqx_broker_helper.erl | 12 ++++++------ 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 0a9264489..380e16c42 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -71,7 +71,7 @@ create_tabs() -> %% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ... %% duplicate_bag: o(1) insert - ok = emqx_tables:new(?SUBSCRIBER, [duplicate_bag | TabOpts]). + ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]). %%------------------------------------------------------------------------------ %% Subscribe API @@ -97,15 +97,16 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), case maps:get(share, SubOpts, undefined) of undefined -> - Shard = emqx_broker_helper:get_shard(SubPid, Topic), - case Shard of + Shared = emqx_broker_helper:get_shared(SubPid, Topic), + case Shared of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); - I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) + I -> + true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}}) end, - SubOpts1 = maps:put(shard, Shard, SubOpts), + SubOpts1 = maps:put(shared, Shared, SubOpts), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shard}), {subscribe, Topic}); + call(pick({Topic, Shared}), {subscribe, Topic}); Group -> %% Shared subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), emqx_shared_sub:subscribe(Group, Topic, SubPid) @@ -128,7 +129,7 @@ unsubscribe(Topic) when is_binary(Topic) -> case maps:get(shared, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; Group -> @@ -239,10 +240,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> SubPid ! {dispatch, Topic, Msg}; -dispatch({shard, I}, Topic, Msg) -> +dispatch({shared, I}, Topic, Msg) -> + lists:foreach(fun(SubPid) -> SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {share, Topic, I}, [])). + end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -267,7 +269,8 @@ subscriber_down(SubPid) -> case maps:get(shared, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; [] -> ok @@ -373,9 +376,9 @@ handle_cast({unsubscribed, Topic}, State) -> {noreply, State}; handle_cast({unsubscribed, Topic, I}, State) -> - case ets:member(?SUBSCRIBER, {shard, Topic, I}) of + case ets:member(?SUBSCRIBER, {shared, Topic, I}) of false -> - true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok end, diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index d3e7f9d37..6830b4d32 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([monitor/2]). --export([get_shard/2]). +-export([get_shared/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) -> error(subid_conflict) end. --spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shard(SubPid, Topic) -> +-spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_shared(SubPid, Topic) -> case create_seq(Topic) of Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) + _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2)) end. -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). @@ -69,8 +69,8 @@ reclaim_seq(Topic) -> init([]) -> %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shards: CPU * 32 - true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), + %% Shareds: CPU * 32 + true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}), %% SubMon: SubPid -> SubId ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), %% Stats timer From ba897e51f9f27de998ba638c69d65c33b8578ce1 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 10:26:50 +0800 Subject: [PATCH 5/7] Subscriber down clear emqx_suboption table --- src/emqx_broker.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 380e16c42..5f3e63059 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -272,7 +272,8 @@ subscriber_down(SubPid) -> I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) - end; + end, + ets:delete(?SUBOPTION, Sub); [] -> ok end end, ets:lookup(?SUBSCRIPTION, SubPid)), From d1be51d398afdf46133baeff0b13320b92721e93 Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 10:52:15 +0800 Subject: [PATCH 6/7] Format code --- src/emqx_broker.erl | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 5f3e63059..b9332ffed 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -127,10 +127,16 @@ unsubscribe(Topic) when is_binary(Topic) -> case maps:get(share, SubOpts, undefined) of undefined -> case maps:get(shared, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shared, Topic, I}) of + true -> ok; + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + end, + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; Group -> ok = emqx_shared_sub:unsubscribe(Group, Topic, SubPid) @@ -267,11 +273,16 @@ subscriber_down(SubPid) -> [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(shared, SubOpts, 0) of - 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - ok = cast(pick(Topic), {unsubscribed, Topic}); - I -> true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) + 0 -> + true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), + ok = cast(pick(Topic), {unsubscribed, Topic}); + I -> + true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shared, Topic, I}) of + true -> ok; + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + end, + ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end, ets:delete(?SUBOPTION, Sub); [] -> ok From 5164d0d6a57e3be187d3d5dfee1b605dbd4f688f Mon Sep 17 00:00:00 2001 From: turtled Date: Sat, 8 Dec 2018 11:40:08 +0800 Subject: [PATCH 7/7] Fix unsubscribe fail and rename shared -> shard --- src/emqx_broker.erl | 43 +++++++++++++++++++------------------- src/emqx_broker_helper.erl | 12 +++++------ 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index b9332ffed..6556a59e5 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -97,19 +97,19 @@ subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}), case maps:get(share, SubOpts, undefined) of undefined -> - Shared = emqx_broker_helper:get_shared(SubPid, Topic), - case Shared of + Shard = emqx_broker_helper:get_shard(SubPid, Topic), + case Shard of 0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}); I -> - true = ets:insert(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - true = ets:insert(?SUBSCRIBER, {Topic, {shared, I}}) + true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}) end, - SubOpts1 = maps:put(shared, Shared, SubOpts), + SubOpts1 = maps:put(shard, Shard, SubOpts), true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts1}), - call(pick({Topic, Shared}), {subscribe, Topic}); - Group -> %% Shared subscription + call(pick({Topic, Shard}), {subscribe, Topic}); + Group -> %% Shard subscription true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}), - emqx_shared_sub:subscribe(Group, Topic, SubPid) + emqx_shard_sub:subscribe(Group, Topic, SubPid) end; true -> ok end. @@ -126,15 +126,15 @@ unsubscribe(Topic) when is_binary(Topic) -> _ = emqx_broker_helper:reclaim_seq(Topic), case maps:get(share, SubOpts, undefined) of undefined -> - case maps:get(shared, SubOpts, 0) of + case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); I -> - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shared, Topic, I}) of + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shard, Topic, I}) of true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) end, ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end; @@ -143,7 +143,8 @@ unsubscribe(Topic) when is_binary(Topic) -> end, true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}), %%true = ets:delete_object(?SUBID, {SubId, SubPid}), - true = ets:delete(?SUBOPTION, {SubPid, Topic}); + true = ets:delete(?SUBOPTION, {SubPid, Topic}), + ok; [] -> ok end. @@ -246,11 +247,11 @@ dispatch(Topic, Delivery = #delivery{message = Msg, results = Results}) -> dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> SubPid ! {dispatch, Topic, Msg}; -dispatch({shared, I}, Topic, Msg) -> +dispatch({shard, I}, Topic, Msg) -> lists:foreach(fun(SubPid) -> SubPid ! {dispatch, Topic, Msg} - end, safe_lookup_element(?SUBSCRIBER, {shared, Topic, I}, [])). + end, safe_lookup_element(?SUBSCRIBER, {shard, Topic, I}, [])). inc_dropped_cnt(<<"$SYS/", _/binary>>) -> ok; @@ -272,15 +273,15 @@ subscriber_down(SubPid) -> case ets:lookup(?SUBOPTION, Sub) of [{_, SubOpts}] -> _ = emqx_broker_helper:reclaim_seq(Topic), - case maps:get(shared, SubOpts, 0) of + case maps:get(shard, SubOpts, 0) of 0 -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), ok = cast(pick(Topic), {unsubscribed, Topic}); I -> - true = ets:delete_object(?SUBSCRIBER, {{shared, Topic, I}, SubPid}), - case ets:member(emqx_subscriber, {shared, Topic, I}) of + true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), + case ets:member(emqx_subscriber, {shard, Topic, I}) of true -> ok; - false -> ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}) + false -> ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}) end, ok = cast(pick({Topic, I}), {unsubscribed, Topic, I}) end, @@ -388,9 +389,9 @@ handle_cast({unsubscribed, Topic}, State) -> {noreply, State}; handle_cast({unsubscribed, Topic, I}, State) -> - case ets:member(?SUBSCRIBER, {shared, Topic, I}) of + case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - true = ets:delete_object(?SUBSCRIBER, {Topic, {shared, I}}), + true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), cast(pick(Topic), {unsubscribed, Topic}); true -> ok end, diff --git a/src/emqx_broker_helper.erl b/src/emqx_broker_helper.erl index 6830b4d32..d3e7f9d37 100644 --- a/src/emqx_broker_helper.erl +++ b/src/emqx_broker_helper.erl @@ -20,7 +20,7 @@ -export([start_link/0]). -export([monitor/2]). --export([get_shared/2]). +-export([get_shard/2]). -export([create_seq/1, reclaim_seq/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -47,11 +47,11 @@ monitor(SubPid, SubId) when is_pid(SubPid) -> error(subid_conflict) end. --spec(get_shared(pid(), emqx_topic:topic()) -> non_neg_integer()). -get_shared(SubPid, Topic) -> +-spec(get_shard(pid(), emqx_topic:topic()) -> non_neg_integer()). +get_shard(SubPid, Topic) -> case create_seq(Topic) of Seq when Seq =< 1024 -> 0; - _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shareds, 2)) + _Seq -> erlang:phash2(SubPid, ets:lookup_element(?SUBSEQ, shards, 2)) end. -spec(create_seq(emqx_topic:topic()) -> emqx_sequence:seqid()). @@ -69,8 +69,8 @@ reclaim_seq(Topic) -> init([]) -> %% SubSeq: Topic -> SeqId ok = emqx_sequence:create(?SUBSEQ), - %% Shareds: CPU * 32 - true = ets:insert(?SUBSEQ, {shareds, emqx_vm:schedulers() * 32}), + %% Shards: CPU * 32 + true = ets:insert(?SUBSEQ, {shards, emqx_vm:schedulers() * 32}), %% SubMon: SubPid -> SubId ok = emqx_tables:new(?SUBMON, [set, protected, {read_concurrency, true}]), %% Stats timer