317 lines
11 KiB
Erlang
317 lines
11 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%
|
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
%% you may not use this file except in compliance with the License.
|
|
%% You may obtain a copy of the License at
|
|
%%
|
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
%%
|
|
%% Unless required by applicable law or agreed to in writing, software
|
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
%% See the License for the specific language governing permissions and
|
|
%% limitations under the License.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_retainer_dispatcher).
|
|
|
|
-behaviour(gen_server).
|
|
|
|
-include("emqx_retainer.hrl").
|
|
-include_lib("emqx/include/logger.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
|
|
%% API
|
|
-export([
|
|
start_link/2,
|
|
dispatch/2,
|
|
refresh_limiter/0,
|
|
refresh_limiter/1,
|
|
wait_dispatch_complete/1,
|
|
worker/0
|
|
]).
|
|
|
|
%% gen_server callbacks
|
|
-export([
|
|
init/1,
|
|
handle_call/3,
|
|
handle_cast/2,
|
|
handle_info/2,
|
|
terminate/2,
|
|
code_change/3,
|
|
format_status/2
|
|
]).
|
|
|
|
-type limiter() :: emqx_htb_limiter:limiter().
|
|
|
|
-define(POOL, ?MODULE).
|
|
|
|
%%%===================================================================
|
|
%%% API
|
|
%%%===================================================================
|
|
dispatch(Context, Topic) ->
|
|
cast({?FUNCTION_NAME, Context, self(), Topic}).
|
|
|
|
%% reset the client's limiter after updated the limiter's config
|
|
refresh_limiter() ->
|
|
Conf = emqx:get_config([retainer]),
|
|
refresh_limiter(Conf).
|
|
|
|
refresh_limiter(Conf) ->
|
|
Workers = gproc_pool:active_workers(?POOL),
|
|
lists:foreach(
|
|
fun({_, Pid}) ->
|
|
gen_server:cast(Pid, {?FUNCTION_NAME, Conf})
|
|
end,
|
|
Workers
|
|
).
|
|
|
|
wait_dispatch_complete(Timeout) ->
|
|
Workers = gproc_pool:active_workers(?POOL),
|
|
lists:foreach(
|
|
fun({_, Pid}) ->
|
|
ok = gen_server:call(Pid, ?FUNCTION_NAME, Timeout)
|
|
end,
|
|
Workers
|
|
).
|
|
|
|
worker() ->
|
|
gproc_pool:pick_worker(?POOL, self()).
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @doc
|
|
%% Starts the server
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec start_link(atom(), pos_integer()) ->
|
|
{ok, Pid :: pid()}
|
|
| {error, Error :: {already_started, pid()}}
|
|
| {error, Error :: term()}
|
|
| ignore.
|
|
start_link(Pool, Id) ->
|
|
gen_server:start_link(
|
|
{local, emqx_utils:proc_name(?MODULE, Id)},
|
|
?MODULE,
|
|
[Pool, Id],
|
|
[{hibernate_after, 1000}]
|
|
).
|
|
|
|
%%%===================================================================
|
|
%%% gen_server callbacks
|
|
%%%===================================================================
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% Initializes the server
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec init(Args :: term()) ->
|
|
{ok, State :: term()}
|
|
| {ok, State :: term(), Timeout :: timeout()}
|
|
| {ok, State :: term(), hibernate}
|
|
| {stop, Reason :: term()}
|
|
| ignore.
|
|
init([Pool, Id]) ->
|
|
erlang:process_flag(trap_exit, true),
|
|
true = gproc_pool:connect_worker(Pool, {Pool, Id}),
|
|
BucketCfg = emqx:get_config([retainer, flow_control, batch_deliver_limiter], undefined),
|
|
{ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
|
|
{ok, #{pool => Pool, id => Id, limiter => Limiter}}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% Handling call messages
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec handle_call(Request :: term(), From :: {pid(), term()}, State :: term()) ->
|
|
{reply, Reply :: term(), NewState :: term()}
|
|
| {reply, Reply :: term(), NewState :: term(), Timeout :: timeout()}
|
|
| {reply, Reply :: term(), NewState :: term(), hibernate}
|
|
| {noreply, NewState :: term()}
|
|
| {noreply, NewState :: term(), Timeout :: timeout()}
|
|
| {noreply, NewState :: term(), hibernate}
|
|
| {stop, Reason :: term(), Reply :: term(), NewState :: term()}
|
|
| {stop, Reason :: term(), NewState :: term()}.
|
|
handle_call(wait_dispatch_complete, _From, State) ->
|
|
{reply, ok, State};
|
|
handle_call(Req, _From, State) ->
|
|
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
|
{reply, ignored, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% Handling cast messages
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec handle_cast(Request :: term(), State :: term()) ->
|
|
{noreply, NewState :: term()}
|
|
| {noreply, NewState :: term(), Timeout :: timeout()}
|
|
| {noreply, NewState :: term(), hibernate}
|
|
| {stop, Reason :: term(), NewState :: term()}.
|
|
handle_cast({dispatch, Context, Pid, Topic}, #{limiter := Limiter} = State) ->
|
|
{ok, Limiter2} = dispatch(Context, Pid, Topic, undefined, Limiter),
|
|
{noreply, State#{limiter := Limiter2}};
|
|
handle_cast({refresh_limiter, Conf}, State) ->
|
|
BucketCfg = emqx_utils_maps:deep_get([flow_control, batch_deliver_limiter], Conf, undefined),
|
|
{ok, Limiter} = emqx_limiter_server:connect(?APP, internal, BucketCfg),
|
|
{noreply, State#{limiter := Limiter}};
|
|
handle_cast(Msg, State) ->
|
|
?SLOG(error, #{msg => "unexpected_cast", cast => Msg}),
|
|
{noreply, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% Handling all non call/cast messages
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec handle_info(Info :: timeout() | term(), State :: term()) ->
|
|
{noreply, NewState :: term()}
|
|
| {noreply, NewState :: term(), Timeout :: timeout()}
|
|
| {noreply, NewState :: term(), hibernate}
|
|
| {stop, Reason :: normal | term(), NewState :: term()}.
|
|
handle_info(Info, State) ->
|
|
?SLOG(error, #{msg => "unexpected_info", info => Info}),
|
|
{noreply, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% This function is called by a gen_server when it is about to
|
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
|
%% with Reason. The return value is ignored.
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec terminate(
|
|
Reason :: normal | shutdown | {shutdown, term()} | term(),
|
|
State :: term()
|
|
) -> any().
|
|
terminate(_Reason, #{pool := Pool, id := Id}) ->
|
|
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% Convert process state when code is changed
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec code_change(
|
|
OldVsn :: term() | {down, term()},
|
|
State :: term(),
|
|
Extra :: term()
|
|
) ->
|
|
{ok, NewState :: term()}
|
|
| {error, Reason :: term()}.
|
|
code_change(_OldVsn, State, _Extra) ->
|
|
{ok, State}.
|
|
|
|
%%--------------------------------------------------------------------
|
|
%% @private
|
|
%% @doc
|
|
%% This function is called for changing the form and appearance
|
|
%% of gen_server status when it is returned from sys:get_status/1,2
|
|
%% or when it appears in termination error logs.
|
|
%% @end
|
|
%%--------------------------------------------------------------------
|
|
-spec format_status(
|
|
Opt :: normal | terminate,
|
|
Status :: list()
|
|
) -> Status :: term().
|
|
format_status(_Opt, Status) ->
|
|
Status.
|
|
|
|
%%%===================================================================
|
|
%%% Internal functions
|
|
%%%===================================================================
|
|
%% @private
|
|
cast(Msg) ->
|
|
gen_server:cast(worker(), Msg).
|
|
|
|
-spec dispatch(context(), pid(), topic(), cursor(), limiter()) -> {ok, limiter()}.
|
|
dispatch(Context, Pid, Topic, Cursor, Limiter) ->
|
|
Mod = emqx_retainer:get_backend_module(),
|
|
case Cursor =/= undefined orelse emqx_topic:wildcard(Topic) of
|
|
false ->
|
|
{ok, Result} = erlang:apply(Mod, read_message, [Context, Topic]),
|
|
deliver(Result, Context, Pid, Topic, undefined, Limiter);
|
|
true ->
|
|
{ok, Result, NewCursor} = erlang:apply(Mod, match_messages, [Context, Topic, Cursor]),
|
|
deliver(Result, Context, Pid, Topic, NewCursor, Limiter)
|
|
end.
|
|
|
|
-spec deliver(list(emqx_types:message()), context(), pid(), topic(), cursor(), limiter()) ->
|
|
{ok, limiter()}.
|
|
deliver([], _Context, _Pid, _Topic, undefined, Limiter) ->
|
|
{ok, Limiter};
|
|
deliver([], Context, Pid, Topic, Cursor, Limiter) ->
|
|
dispatch(Context, Pid, Topic, Cursor, Limiter);
|
|
deliver(Result, Context, Pid, Topic, Cursor, Limiter) ->
|
|
case erlang:is_process_alive(Pid) of
|
|
false ->
|
|
{ok, Limiter};
|
|
_ ->
|
|
DeliverNum = emqx_conf:get([retainer, flow_control, batch_deliver_number], undefined),
|
|
case DeliverNum of
|
|
0 ->
|
|
do_deliver(Result, Pid, Topic),
|
|
{ok, Limiter};
|
|
_ ->
|
|
case do_deliver(Result, DeliverNum, Pid, Topic, Limiter) of
|
|
{ok, Limiter2} ->
|
|
deliver([], Context, Pid, Topic, Cursor, Limiter2);
|
|
{drop, Limiter2} ->
|
|
{ok, Limiter2}
|
|
end
|
|
end
|
|
end.
|
|
|
|
do_deliver([], _DeliverNum, _Pid, _Topic, Limiter) ->
|
|
{ok, Limiter};
|
|
do_deliver(Msgs, DeliverNum, Pid, Topic, Limiter) ->
|
|
{Num, ToDelivers, Msgs2} = safe_split(DeliverNum, Msgs),
|
|
case emqx_htb_limiter:consume(Num, Limiter) of
|
|
{ok, Limiter2} ->
|
|
do_deliver(ToDelivers, Pid, Topic),
|
|
do_deliver(Msgs2, DeliverNum, Pid, Topic, Limiter2);
|
|
{drop, _} = Drop ->
|
|
?SLOG(debug, #{
|
|
msg => "retained_message_dropped",
|
|
reason => "reached_ratelimit",
|
|
dropped_count => length(ToDelivers)
|
|
}),
|
|
Drop
|
|
end.
|
|
|
|
do_deliver([Msg | T], Pid, Topic) ->
|
|
case emqx_banned:look_up({clientid, Msg#message.from}) of
|
|
[] ->
|
|
Pid ! {deliver, Topic, Msg},
|
|
ok;
|
|
_ ->
|
|
?tp(
|
|
notice,
|
|
ignore_retained_message_deliver,
|
|
#{
|
|
reason => "client is banned",
|
|
clientid => Msg#message.from
|
|
}
|
|
)
|
|
end,
|
|
do_deliver(T, Pid, Topic);
|
|
do_deliver([], _, _) ->
|
|
ok.
|
|
|
|
safe_split(N, List) ->
|
|
safe_split(N, List, 0, []).
|
|
|
|
safe_split(0, List, Count, Acc) ->
|
|
{Count, lists:reverse(Acc), List};
|
|
safe_split(_N, [], Count, Acc) ->
|
|
{Count, lists:reverse(Acc), []};
|
|
safe_split(N, [H | T], Count, Acc) ->
|
|
safe_split(N - 1, T, Count + 1, [H | Acc]).
|