506 lines
17 KiB
Erlang
506 lines
17 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_broker).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("emqx.hrl").
|
|
-include("logger.hrl").
|
|
-include("types.hrl").
|
|
-include("emqx_mqtt.hrl").
|
|
|
|
-logger_header("[Broker]").
|
|
|
|
-export([start_link/2]).
|
|
|
|
%% PubSub
|
|
-export([ subscribe/1
|
|
, subscribe/2
|
|
, subscribe/3
|
|
]).
|
|
|
|
-export([unsubscribe/1]).
|
|
|
|
-export([subscriber_down/1]).
|
|
|
|
-export([ publish/1
|
|
, safe_publish/1
|
|
]).
|
|
|
|
-export([dispatch/2]).
|
|
|
|
%% PubSub Infos
|
|
-export([ subscriptions/1
|
|
, subscribers/1
|
|
, subscribed/2
|
|
]).
|
|
|
|
-export([ get_subopts/2
|
|
, set_subopts/2
|
|
]).
|
|
|
|
-export([topics/0]).
|
|
|
|
%% Stats fun
|
|
-export([stats_fun/0]).
|
|
|
|
%% gen_server callbacks
|
|
-export([ init/1
|
|
, handle_call/3
|
|
, handle_cast/2
|
|
, handle_info/2
|
|
, terminate/2
|
|
, code_change/3
|
|
]).
|
|
|
|
-import(emqx_tables, [lookup_value/2, lookup_value/3]).
|
|
|
|
-ifdef(TEST).
|
|
-compile(export_all).
|
|
-compile(nowarn_export_all).
|
|
-endif.
|
|
|
|
-define(BROKER, ?MODULE).
|
|
|
|
%% ETS tables for PubSub
|
|
-define(SUBOPTION, emqx_suboption).
|
|
-define(SUBSCRIBER, emqx_subscriber).
|
|
-define(SUBSCRIPTION, emqx_subscription).
|
|
|
|
%% Guards
|
|
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
|
|
|
|
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
|
|
start_link(Pool, Id) ->
|
|
ok = create_tabs(),
|
|
gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)},
|
|
?MODULE, [Pool, Id], []).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Create tabs
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(create_tabs() -> ok).
|
|
create_tabs() ->
|
|
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
|
|
|
|
%% SubOption: {SubPid, Topic} -> SubOption
|
|
ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
|
|
|
|
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
|
|
%% duplicate_bag: o(1) insert
|
|
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
|
|
|
|
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
|
|
%% bag: o(n) insert:(
|
|
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Subscribe API
|
|
%%------------------------------------------------------------------------------
|
|
|
|
-spec(subscribe(emqx_topic:topic()) -> ok).
|
|
subscribe(Topic) when is_binary(Topic) ->
|
|
subscribe(Topic, undefined).
|
|
|
|
-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
|
|
subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) ->
|
|
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
|
|
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
|
|
subscribe(Topic, undefined, SubOpts).
|
|
|
|
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
|
|
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
|
|
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
|
|
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
|
|
SubPid = self(),
|
|
case ets:member(?SUBOPTION, {SubPid, Topic}) of
|
|
false -> %% New
|
|
ok = emqx_broker_helper:register_sub(SubPid, SubId),
|
|
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
|
|
true -> %% Existed
|
|
set_subopts(SubPid, Topic, with_subid(SubId, SubOpts)),
|
|
ok %% ensure to return 'ok'
|
|
end.
|
|
|
|
-compile({inline, [with_subid/2]}).
|
|
with_subid(undefined, SubOpts) ->
|
|
SubOpts;
|
|
with_subid(SubId, SubOpts) ->
|
|
maps:put(subid, SubId, SubOpts).
|
|
|
|
%% @private
|
|
do_subscribe(Topic, SubPid, SubOpts) ->
|
|
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
|
|
Group = maps:get(share, SubOpts, undefined),
|
|
do_subscribe(Group, Topic, SubPid, SubOpts).
|
|
|
|
do_subscribe(undefined, Topic, SubPid, SubOpts) ->
|
|
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
|
|
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
|
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
call(pick(Topic), {subscribe, Topic});
|
|
I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}),
|
|
call(pick({Topic, I}), {subscribe, Topic, I})
|
|
end;
|
|
|
|
%% Shared subscription
|
|
do_subscribe(Group, Topic, SubPid, SubOpts) ->
|
|
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
|
|
emqx_shared_sub:subscribe(Group, Topic, SubPid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Unsubscribe API
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(unsubscribe(emqx_topic:topic()) -> ok).
|
|
unsubscribe(Topic) when is_binary(Topic) ->
|
|
SubPid = self(),
|
|
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
|
|
[{_, SubOpts}] ->
|
|
emqx_trace:unsubscribe(Topic, SubOpts),
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
do_unsubscribe(Topic, SubPid, SubOpts);
|
|
[] -> ok
|
|
end.
|
|
|
|
do_unsubscribe(Topic, SubPid, SubOpts) ->
|
|
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
|
|
Group = maps:get(share, SubOpts, undefined),
|
|
do_unsubscribe(Group, Topic, SubPid, SubOpts).
|
|
|
|
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
|
|
clean_subscribe(SubOpts, Topic, SubPid);
|
|
do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
|
|
emqx_shared_sub:unsubscribe(Group, Topic, SubPid).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Publish
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
|
|
publish(Msg) when is_record(Msg, message) ->
|
|
_ = emqx_trace:publish(Msg),
|
|
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
|
|
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
|
|
#message{headers = #{allow_publish := false}} ->
|
|
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
|
|
[];
|
|
Msg1 = #message{topic = Topic} ->
|
|
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
|
|
end.
|
|
|
|
%% Called internally
|
|
-spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()).
|
|
safe_publish(Msg) when is_record(Msg, message) ->
|
|
try
|
|
publish(Msg)
|
|
catch
|
|
_:Error:Stk->
|
|
?LOG(error, "Publish error: ~0p~n~s~n~0p",
|
|
[Error, emqx_message:format(Msg), Stk]),
|
|
[]
|
|
end.
|
|
|
|
-compile({inline, [delivery/1]}).
|
|
delivery(Msg) -> #delivery{sender = self(), message = Msg}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Route
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
|
|
-> emqx_types:publish_result()).
|
|
route([], #delivery{message = Msg}) ->
|
|
drop_message(Msg),
|
|
[];
|
|
|
|
route(Routes, Delivery) ->
|
|
lists:foldl(fun(Route, Acc) ->
|
|
[do_route(Route, Delivery) | Acc]
|
|
end, [], Routes).
|
|
|
|
drop_message(Msg) ->
|
|
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
|
|
ok = inc_dropped_cnt(Msg).
|
|
|
|
do_route({To, Node}, Delivery) when Node =:= node() ->
|
|
{Node, To, dispatch(To, Delivery)};
|
|
do_route({To, Node}, Delivery) when is_atom(Node) ->
|
|
{Node, To, forward(Node, To, Delivery, emqx:get_env(rpc_mode, async))};
|
|
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
|
|
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
|
|
|
|
aggre([]) -> [];
|
|
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
|
|
[{To, Node}];
|
|
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
|
|
[{To, Group}];
|
|
aggre(Routes) ->
|
|
lists:foldl(
|
|
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
|
|
[{To, Node} | Acc];
|
|
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
|
|
lists:usort([{To, Group} | Acc])
|
|
end, [], Routes).
|
|
|
|
%% @doc Forward message to another node.
|
|
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async)
|
|
-> emqx_types:deliver_result()).
|
|
forward(Node, To, Delivery, async) ->
|
|
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
|
true -> emqx_metrics:inc('messages.forward');
|
|
{badrpc, Reason} ->
|
|
?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]),
|
|
{error, badrpc}
|
|
end;
|
|
|
|
forward(Node, To, Delivery, sync) ->
|
|
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
|
|
{badrpc, Reason} ->
|
|
?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]),
|
|
{error, badrpc};
|
|
Result ->
|
|
emqx_metrics:inc('messages.forward'), Result
|
|
end.
|
|
|
|
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
|
|
dispatch(Topic, #delivery{message = Msg}) ->
|
|
DispN = lists:foldl(
|
|
fun(Sub, N) ->
|
|
N + dispatch(Sub, Topic, Msg)
|
|
end, 0, subscribers(Topic)),
|
|
case DispN of
|
|
0 ->
|
|
drop_message(Msg),
|
|
{error, no_subscribers};
|
|
_ ->
|
|
{ok, DispN}
|
|
end.
|
|
|
|
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
|
|
case erlang:is_process_alive(SubPid) of
|
|
true ->
|
|
SubPid ! {deliver, Topic, Msg}, 1;
|
|
false -> 0
|
|
end;
|
|
|
|
dispatch({shard, I}, Topic, Msg) ->
|
|
lists:foldl(
|
|
fun(SubPid, N) ->
|
|
N + dispatch(SubPid, Topic, Msg)
|
|
end, 0, subscribers({shard, Topic, I})).
|
|
|
|
-compile({inline, [inc_dropped_cnt/1]}).
|
|
inc_dropped_cnt(Msg) ->
|
|
case emqx_message:is_sys(Msg) of
|
|
true -> ok;
|
|
false -> ok = emqx_metrics:inc('messages.dropped'),
|
|
emqx_metrics:inc('messages.dropped.no_subscribers')
|
|
end.
|
|
|
|
-compile({inline, [subscribers/1]}).
|
|
-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()})
|
|
-> [pid()]).
|
|
subscribers(Topic) when is_binary(Topic) ->
|
|
lookup_value(?SUBSCRIBER, Topic, []);
|
|
subscribers(Shard = {shard, _Topic, _I}) ->
|
|
lookup_value(?SUBSCRIBER, Shard, []).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Subscriber is down
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(subscriber_down(pid()) -> true).
|
|
subscriber_down(SubPid) ->
|
|
lists:foreach(
|
|
fun(Topic) ->
|
|
case lookup_value(?SUBOPTION, {SubPid, Topic}) of
|
|
SubOpts when is_map(SubOpts) ->
|
|
_ = emqx_broker_helper:reclaim_seq(Topic),
|
|
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
|
|
clean_subscribe(SubOpts, Topic, SubPid);
|
|
undefined -> ok
|
|
end
|
|
end, lookup_value(?SUBSCRIPTION, SubPid, [])),
|
|
ets:delete(?SUBSCRIPTION, SubPid).
|
|
|
|
clean_subscribe(SubOpts, Topic, SubPid) ->
|
|
case maps:get(shard, SubOpts, 0) of
|
|
0 ->
|
|
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
|
|
ok = emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
|
|
ok = cast(pick(Topic), {unsubscribed, Topic});
|
|
I ->
|
|
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
|
|
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Management APIs
|
|
%%--------------------------------------------------------------------
|
|
|
|
-spec(subscriptions(pid() | emqx_types:subid())
|
|
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
|
|
subscriptions(SubPid) when is_pid(SubPid) ->
|
|
[{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|
|
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
|
|
subscriptions(SubId) ->
|
|
case emqx_broker_helper:lookup_subpid(SubId) of
|
|
SubPid when is_pid(SubPid) ->
|
|
subscriptions(SubPid);
|
|
undefined -> []
|
|
end.
|
|
|
|
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()).
|
|
subscribed(SubPid, Topic) when is_pid(SubPid) ->
|
|
ets:member(?SUBOPTION, {SubPid, Topic});
|
|
subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
|
|
SubPid = emqx_broker_helper:lookup_subpid(SubId),
|
|
ets:member(?SUBOPTION, {SubPid, Topic}).
|
|
|
|
-spec(get_subopts(pid(), emqx_topic:topic()) -> maybe(emqx_types:subopts())).
|
|
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
|
|
lookup_value(?SUBOPTION, {SubPid, Topic});
|
|
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
|
|
case emqx_broker_helper:lookup_subpid(SubId) of
|
|
SubPid when is_pid(SubPid) ->
|
|
get_subopts(SubPid, Topic);
|
|
undefined -> undefined
|
|
end.
|
|
|
|
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
|
|
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
|
|
set_subopts(self(), Topic, NewOpts).
|
|
|
|
%% @private
|
|
set_subopts(SubPid, Topic, NewOpts) ->
|
|
Sub = {SubPid, Topic},
|
|
case ets:lookup(?SUBOPTION, Sub) of
|
|
[{_, OldOpts}] ->
|
|
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});
|
|
[] -> false
|
|
end.
|
|
|
|
-spec(topics() -> [emqx_topic:topic()]).
|
|
topics() ->
|
|
emqx_router:topics().
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Stats fun
|
|
%%--------------------------------------------------------------------
|
|
|
|
stats_fun() ->
|
|
safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'),
|
|
safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'),
|
|
safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max').
|
|
|
|
safe_update_stats(Tab, Stat, MaxStat) ->
|
|
case ets:info(Tab, size) of
|
|
undefined -> ok;
|
|
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
|
|
end.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% call, cast, pick
|
|
%%--------------------------------------------------------------------
|
|
|
|
-compile({inline, [call/2, cast/2, pick/1]}).
|
|
|
|
call(Broker, Req) ->
|
|
gen_server:call(Broker, Req, infinity).
|
|
|
|
cast(Broker, Msg) ->
|
|
gen_server:cast(Broker, Msg).
|
|
|
|
%% Pick a broker
|
|
pick(Topic) ->
|
|
gproc_pool:pick_worker(broker_pool, Topic).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% gen_server callbacks
|
|
%%--------------------------------------------------------------------
|
|
|
|
init([Pool, Id]) ->
|
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
{ok, #{pool => Pool, id => Id}}.
|
|
|
|
handle_call({subscribe, Topic}, _From, State) ->
|
|
Ok = emqx_router:do_add_route(Topic),
|
|
{reply, Ok, State};
|
|
|
|
handle_call({subscribe, Topic, I}, _From, State) ->
|
|
Ok = case get(Shard = {Topic, I}) of
|
|
undefined ->
|
|
_ = put(Shard, true),
|
|
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
|
|
cast(pick(Topic), {subscribe, Topic});
|
|
true -> ok
|
|
end,
|
|
{reply, Ok, State};
|
|
|
|
handle_call(Req, _From, State) ->
|
|
?LOG(error, "Unexpected call: ~p", [Req]),
|
|
{reply, ignored, State}.
|
|
|
|
handle_cast({subscribe, Topic}, State) ->
|
|
case emqx_router:do_add_route(Topic) of
|
|
ok -> ok;
|
|
{error, Reason} ->
|
|
?LOG(error, "Failed to add route: ~p", [Reason])
|
|
end,
|
|
{noreply, State};
|
|
|
|
handle_cast({unsubscribed, Topic}, State) ->
|
|
case ets:member(?SUBSCRIBER, Topic) of
|
|
false ->
|
|
_ = emqx_router:do_delete_route(Topic),
|
|
ok;
|
|
true -> ok
|
|
end,
|
|
{noreply, State};
|
|
|
|
handle_cast({unsubscribed, Topic, I}, State) ->
|
|
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
|
|
false ->
|
|
_ = erase({Topic, I}),
|
|
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
|
|
cast(pick(Topic), {unsubscribed, Topic});
|
|
true -> ok
|
|
end,
|
|
{noreply, State};
|
|
|
|
handle_cast(Msg, State) ->
|
|
?LOG(error, "Unexpected cast: ~p", [Msg]),
|
|
{noreply, State}.
|
|
|
|
handle_info(Info, State) ->
|
|
?LOG(error, "Unexpected info: ~p", [Info]),
|
|
{noreply, State}.
|
|
|
|
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
|
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
|
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% Internal functions
|
|
%%--------------------------------------------------------------------
|