emqx/src/emqx_broker.erl

506 lines
17 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2017-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_broker).
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include("types.hrl").
-include("emqx_mqtt.hrl").
-logger_header("[Broker]").
-export([start_link/2]).
%% PubSub
-export([ subscribe/1
, subscribe/2
, subscribe/3
]).
-export([unsubscribe/1]).
-export([subscriber_down/1]).
-export([ publish/1
, safe_publish/1
]).
-export([dispatch/2]).
%% PubSub Infos
-export([ subscriptions/1
, subscribers/1
, subscribed/2
]).
-export([ get_subopts/2
, set_subopts/2
]).
-export([topics/0]).
%% Stats fun
-export([stats_fun/0]).
%% gen_server callbacks
-export([ init/1
, handle_call/3
, handle_cast/2
, handle_info/2
, terminate/2
, code_change/3
]).
-import(emqx_tables, [lookup_value/2, lookup_value/3]).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).
-endif.
-define(BROKER, ?MODULE).
%% ETS tables for PubSub
-define(SUBOPTION, emqx_suboption).
-define(SUBSCRIBER, emqx_subscriber).
-define(SUBSCRIPTION, emqx_subscription).
%% Guards
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
-spec(start_link(atom(), pos_integer()) -> startlink_ret()).
start_link(Pool, Id) ->
ok = create_tabs(),
gen_server:start_link({local, emqx_misc:proc_name(?BROKER, Id)},
?MODULE, [Pool, Id], []).
%%------------------------------------------------------------------------------
%% Create tabs
%%------------------------------------------------------------------------------
-spec(create_tabs() -> ok).
create_tabs() ->
TabOpts = [public, {read_concurrency, true}, {write_concurrency, true}],
%% SubOption: {SubPid, Topic} -> SubOption
ok = emqx_tables:new(?SUBOPTION, [set | TabOpts]),
%% Subscription: SubPid -> Topic1, Topic2, Topic3, ...
%% duplicate_bag: o(1) insert
ok = emqx_tables:new(?SUBSCRIPTION, [duplicate_bag | TabOpts]),
%% Subscriber: Topic -> SubPid1, SubPid2, SubPid3, ...
%% bag: o(n) insert:(
ok = emqx_tables:new(?SUBSCRIBER, [bag | TabOpts]).
%%------------------------------------------------------------------------------
%% Subscribe API
%%------------------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic()) -> ok).
subscribe(Topic) when is_binary(Topic) ->
subscribe(Topic, undefined).
-spec(subscribe(emqx_topic:topic(), emqx_types:subid() | emqx_types:subopts()) -> ok).
subscribe(Topic, SubId) when is_binary(Topic), ?IS_SUBID(SubId) ->
subscribe(Topic, SubId, ?DEFAULT_SUBOPTS);
subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
subscribe(Topic, undefined, SubOpts).
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts0) when is_binary(Topic), ?IS_SUBID(SubId), is_map(SubOpts0) ->
SubOpts = maps:merge(?DEFAULT_SUBOPTS, SubOpts0),
_ = emqx_trace:subscribe(Topic, SubId, SubOpts),
SubPid = self(),
case ets:member(?SUBOPTION, {SubPid, Topic}) of
false -> %% New
ok = emqx_broker_helper:register_sub(SubPid, SubId),
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
true -> %% Existed
set_subopts(SubPid, Topic, with_subid(SubId, SubOpts)),
ok %% ensure to return 'ok'
end.
-compile({inline, [with_subid/2]}).
with_subid(undefined, SubOpts) ->
SubOpts;
with_subid(SubId, SubOpts) ->
maps:put(subid, SubId, SubOpts).
%% @private
do_subscribe(Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_subscribe(Group, Topic, SubPid, SubOpts).
do_subscribe(undefined, Topic, SubPid, SubOpts) ->
case emqx_broker_helper:get_sub_shard(SubPid, Topic) of
0 -> true = ets:insert(?SUBSCRIBER, {Topic, SubPid}),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
call(pick(Topic), {subscribe, Topic});
I -> true = ets:insert(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, maps:put(shard, I, SubOpts)}),
call(pick({Topic, I}), {subscribe, Topic, I})
end;
%% Shared subscription
do_subscribe(Group, Topic, SubPid, SubOpts) ->
true = ets:insert(?SUBOPTION, {{SubPid, Topic}, SubOpts}),
emqx_shared_sub:subscribe(Group, Topic, SubPid).
%%--------------------------------------------------------------------
%% Unsubscribe API
%%--------------------------------------------------------------------
-spec(unsubscribe(emqx_topic:topic()) -> ok).
unsubscribe(Topic) when is_binary(Topic) ->
SubPid = self(),
case ets:lookup(?SUBOPTION, {SubPid, Topic}) of
[{_, SubOpts}] ->
emqx_trace:unsubscribe(Topic, SubOpts),
_ = emqx_broker_helper:reclaim_seq(Topic),
do_unsubscribe(Topic, SubPid, SubOpts);
[] -> ok
end.
do_unsubscribe(Topic, SubPid, SubOpts) ->
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
true = ets:delete_object(?SUBSCRIPTION, {SubPid, Topic}),
Group = maps:get(share, SubOpts, undefined),
do_unsubscribe(Group, Topic, SubPid, SubOpts).
do_unsubscribe(undefined, Topic, SubPid, SubOpts) ->
clean_subscribe(SubOpts, Topic, SubPid);
do_unsubscribe(Group, Topic, SubPid, _SubOpts) ->
emqx_shared_sub:unsubscribe(Group, Topic, SubPid).
%%--------------------------------------------------------------------
%% Publish
%%--------------------------------------------------------------------
-spec(publish(emqx_types:message()) -> emqx_types:publish_result()).
publish(Msg) when is_record(Msg, message) ->
_ = emqx_trace:publish(Msg),
emqx_message:is_sys(Msg) orelse emqx_metrics:inc('messages.publish'),
case emqx_hooks:run_fold('message.publish', [], emqx_message:clean_dup(Msg)) of
#message{headers = #{allow_publish := false}} ->
?LOG(notice, "Stop publishing: ~s", [emqx_message:format(Msg)]),
[];
Msg1 = #message{topic = Topic} ->
route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1))
end.
%% Called internally
-spec(safe_publish(emqx_types:message()) -> emqx_types:publish_result()).
safe_publish(Msg) when is_record(Msg, message) ->
try
publish(Msg)
catch
_:Error:Stk->
?LOG(error, "Publish error: ~0p~n~s~n~0p",
[Error, emqx_message:format(Msg), Stk]),
[]
end.
-compile({inline, [delivery/1]}).
delivery(Msg) -> #delivery{sender = self(), message = Msg}.
%%--------------------------------------------------------------------
%% Route
%%--------------------------------------------------------------------
-spec(route([emqx_types:route_entry()], emqx_types:delivery())
-> emqx_types:publish_result()).
route([], #delivery{message = Msg}) ->
drop_message(Msg),
[];
route(Routes, Delivery) ->
lists:foldl(fun(Route, Acc) ->
[do_route(Route, Delivery) | Acc]
end, [], Routes).
drop_message(Msg) ->
ok = emqx_hooks:run('message.dropped', [Msg, #{node => node()}, no_subscribers]),
ok = inc_dropped_cnt(Msg).
do_route({To, Node}, Delivery) when Node =:= node() ->
{Node, To, dispatch(To, Delivery)};
do_route({To, Node}, Delivery) when is_atom(Node) ->
{Node, To, forward(Node, To, Delivery, emqx:get_env(rpc_mode, async))};
do_route({To, Group}, Delivery) when is_tuple(Group); is_binary(Group) ->
{share, To, emqx_shared_sub:dispatch(Group, To, Delivery)}.
aggre([]) -> [];
aggre([#route{topic = To, dest = Node}]) when is_atom(Node) ->
[{To, Node}];
aggre([#route{topic = To, dest = {Group, _Node}}]) ->
[{To, Group}];
aggre(Routes) ->
lists:foldl(
fun(#route{topic = To, dest = Node}, Acc) when is_atom(Node) ->
[{To, Node} | Acc];
(#route{topic = To, dest = {Group, _Node}}, Acc) ->
lists:usort([{To, Group} | Acc])
end, [], Routes).
%% @doc Forward message to another node.
-spec(forward(node(), emqx_types:topic(), emqx_types:delivery(), RpcMode::sync | async)
-> emqx_types:deliver_result()).
forward(Node, To, Delivery, async) ->
case emqx_rpc:cast(To, Node, ?BROKER, dispatch, [To, Delivery]) of
true -> emqx_metrics:inc('messages.forward');
{badrpc, Reason} ->
?LOG(error, "Ansync forward msg to ~s failed due to ~p", [Node, Reason]),
{error, badrpc}
end;
forward(Node, To, Delivery, sync) ->
case emqx_rpc:call(To, Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
?LOG(error, "Sync forward msg to ~s failed due to ~p", [Node, Reason]),
{error, badrpc};
Result ->
emqx_metrics:inc('messages.forward'), Result
end.
-spec(dispatch(emqx_topic:topic(), emqx_types:delivery()) -> emqx_types:deliver_result()).
dispatch(Topic, #delivery{message = Msg}) ->
DispN = lists:foldl(
fun(Sub, N) ->
N + dispatch(Sub, Topic, Msg)
end, 0, subscribers(Topic)),
case DispN of
0 ->
drop_message(Msg),
{error, no_subscribers};
_ ->
{ok, DispN}
end.
dispatch(SubPid, Topic, Msg) when is_pid(SubPid) ->
case erlang:is_process_alive(SubPid) of
true ->
SubPid ! {deliver, Topic, Msg}, 1;
false -> 0
end;
dispatch({shard, I}, Topic, Msg) ->
lists:foldl(
fun(SubPid, N) ->
N + dispatch(SubPid, Topic, Msg)
end, 0, subscribers({shard, Topic, I})).
-compile({inline, [inc_dropped_cnt/1]}).
inc_dropped_cnt(Msg) ->
case emqx_message:is_sys(Msg) of
true -> ok;
false -> ok = emqx_metrics:inc('messages.dropped'),
emqx_metrics:inc('messages.dropped.no_subscribers')
end.
-compile({inline, [subscribers/1]}).
-spec(subscribers(emqx_topic:topic() | {shard, emqx_topic:topic(), non_neg_integer()})
-> [pid()]).
subscribers(Topic) when is_binary(Topic) ->
lookup_value(?SUBSCRIBER, Topic, []);
subscribers(Shard = {shard, _Topic, _I}) ->
lookup_value(?SUBSCRIBER, Shard, []).
%%--------------------------------------------------------------------
%% Subscriber is down
%%--------------------------------------------------------------------
-spec(subscriber_down(pid()) -> true).
subscriber_down(SubPid) ->
lists:foreach(
fun(Topic) ->
case lookup_value(?SUBOPTION, {SubPid, Topic}) of
SubOpts when is_map(SubOpts) ->
_ = emqx_broker_helper:reclaim_seq(Topic),
true = ets:delete(?SUBOPTION, {SubPid, Topic}),
clean_subscribe(SubOpts, Topic, SubPid);
undefined -> ok
end
end, lookup_value(?SUBSCRIPTION, SubPid, [])),
ets:delete(?SUBSCRIPTION, SubPid).
clean_subscribe(SubOpts, Topic, SubPid) ->
case maps:get(shard, SubOpts, 0) of
0 ->
true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}),
ok = emqx_exclusive_subscription:unsubscribe(Topic, SubOpts),
ok = cast(pick(Topic), {unsubscribed, Topic});
I ->
true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}),
ok = cast(pick({Topic, I}), {unsubscribed, Topic, I})
end.
%%--------------------------------------------------------------------
%% Management APIs
%%--------------------------------------------------------------------
-spec(subscriptions(pid() | emqx_types:subid())
-> [{emqx_topic:topic(), emqx_types:subopts()}]).
subscriptions(SubPid) when is_pid(SubPid) ->
[{Topic, lookup_value(?SUBOPTION, {SubPid, Topic}, #{})}
|| Topic <- lookup_value(?SUBSCRIPTION, SubPid, [])];
subscriptions(SubId) ->
case emqx_broker_helper:lookup_subpid(SubId) of
SubPid when is_pid(SubPid) ->
subscriptions(SubPid);
undefined -> []
end.
-spec(subscribed(pid() | emqx_types:subid(), emqx_topic:topic()) -> boolean()).
subscribed(SubPid, Topic) when is_pid(SubPid) ->
ets:member(?SUBOPTION, {SubPid, Topic});
subscribed(SubId, Topic) when ?IS_SUBID(SubId) ->
SubPid = emqx_broker_helper:lookup_subpid(SubId),
ets:member(?SUBOPTION, {SubPid, Topic}).
-spec(get_subopts(pid(), emqx_topic:topic()) -> maybe(emqx_types:subopts())).
get_subopts(SubPid, Topic) when is_pid(SubPid), is_binary(Topic) ->
lookup_value(?SUBOPTION, {SubPid, Topic});
get_subopts(SubId, Topic) when ?IS_SUBID(SubId) ->
case emqx_broker_helper:lookup_subpid(SubId) of
SubPid when is_pid(SubPid) ->
get_subopts(SubPid, Topic);
undefined -> undefined
end.
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
set_subopts(self(), Topic, NewOpts).
%% @private
set_subopts(SubPid, Topic, NewOpts) ->
Sub = {SubPid, Topic},
case ets:lookup(?SUBOPTION, Sub) of
[{_, OldOpts}] ->
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});
[] -> false
end.
-spec(topics() -> [emqx_topic:topic()]).
topics() ->
emqx_router:topics().
%%--------------------------------------------------------------------
%% Stats fun
%%--------------------------------------------------------------------
stats_fun() ->
safe_update_stats(?SUBSCRIBER, 'subscribers.count', 'subscribers.max'),
safe_update_stats(?SUBSCRIPTION, 'subscriptions.count', 'subscriptions.max'),
safe_update_stats(?SUBOPTION, 'suboptions.count', 'suboptions.max').
safe_update_stats(Tab, Stat, MaxStat) ->
case ets:info(Tab, size) of
undefined -> ok;
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
%%--------------------------------------------------------------------
%% call, cast, pick
%%--------------------------------------------------------------------
-compile({inline, [call/2, cast/2, pick/1]}).
call(Broker, Req) ->
gen_server:call(Broker, Req, infinity).
cast(Broker, Msg) ->
gen_server:cast(Broker, Msg).
%% Pick a broker
pick(Topic) ->
gproc_pool:pick_worker(broker_pool, Topic).
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([Pool, Id]) ->
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #{pool => Pool, id => Id}}.
handle_call({subscribe, Topic}, _From, State) ->
Ok = emqx_router:do_add_route(Topic),
{reply, Ok, State};
handle_call({subscribe, Topic, I}, _From, State) ->
Ok = case get(Shard = {Topic, I}) of
undefined ->
_ = put(Shard, true),
true = ets:insert(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {subscribe, Topic});
true -> ok
end,
{reply, Ok, State};
handle_call(Req, _From, State) ->
?LOG(error, "Unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} ->
?LOG(error, "Failed to add route: ~p", [Reason])
end,
{noreply, State};
handle_cast({unsubscribed, Topic}, State) ->
case ets:member(?SUBSCRIBER, Topic) of
false ->
_ = emqx_router:do_delete_route(Topic),
ok;
true -> ok
end,
{noreply, State};
handle_cast({unsubscribed, Topic, I}, State) ->
case ets:member(?SUBSCRIBER, {shard, Topic, I}) of
false ->
_ = erase({Topic, I}),
true = ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}),
cast(pick(Topic), {unsubscribed, Topic});
true -> ok
end,
{noreply, State};
handle_cast(Msg, State) ->
?LOG(error, "Unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?LOG(error, "Unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------