feat(banned): add a bootstrap file for banned
This commit is contained in:
parent
51a8d3b041
commit
913e0ce18b
|
@ -16,6 +16,8 @@
|
|||
|
||||
-module(emqx_banned).
|
||||
|
||||
-feature(maybe_expr, enable).
|
||||
|
||||
-behaviour(gen_server).
|
||||
-behaviour(emqx_db_backup).
|
||||
|
||||
|
@ -48,6 +50,7 @@
|
|||
handle_call/3,
|
||||
handle_cast/2,
|
||||
handle_info/2,
|
||||
handle_continue/2,
|
||||
terminate/2,
|
||||
code_change/3
|
||||
]).
|
||||
|
@ -132,7 +135,7 @@ format(#banned{
|
|||
until => to_rfc3339(Until)
|
||||
}.
|
||||
|
||||
-spec parse(map()) -> emqx_types:banned() | {error, term()}.
|
||||
-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}.
|
||||
parse(Params) ->
|
||||
case parse_who(Params) of
|
||||
{error, Reason} ->
|
||||
|
@ -144,13 +147,13 @@ parse(Params) ->
|
|||
Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME),
|
||||
case Until > erlang:system_time(second) of
|
||||
true ->
|
||||
#banned{
|
||||
{ok, #banned{
|
||||
who = Who,
|
||||
by = By,
|
||||
reason = Reason,
|
||||
at = At,
|
||||
until = Until
|
||||
};
|
||||
}};
|
||||
false ->
|
||||
ErrorReason =
|
||||
io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]),
|
||||
|
@ -234,12 +237,137 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR};
|
|||
who(peerhost_net, CIDR) when is_binary(CIDR) ->
|
||||
{peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% Import From CSV
|
||||
%%--------------------------------------------------------------------
|
||||
init_from_csv(<<>>) ->
|
||||
ok;
|
||||
init_from_csv(File) ->
|
||||
maybe
|
||||
core ?= mria_rlog:role(),
|
||||
'$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB),
|
||||
'$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB),
|
||||
{ok, Bin} ?= file:read_file(File),
|
||||
Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}),
|
||||
{ok, List} ?= parse_stream(Stream),
|
||||
import_from_stream(List),
|
||||
?SLOG(info, #{
|
||||
msg => "load_banned_bootstrap_file_succeeded",
|
||||
file => File
|
||||
})
|
||||
else
|
||||
replicant ->
|
||||
ok;
|
||||
{Name, _} when
|
||||
Name == peerhost;
|
||||
Name == peerhost_net;
|
||||
Name == clientid_re;
|
||||
Name == username_re;
|
||||
Name == clientid;
|
||||
Name == username
|
||||
->
|
||||
ok;
|
||||
{error, Reason} = Error ->
|
||||
?SLOG(error, #{
|
||||
msg => "load_banned_bootstrap_file_failed",
|
||||
reason => Reason,
|
||||
file => File
|
||||
}),
|
||||
Error
|
||||
end.
|
||||
|
||||
import_from_stream(Stream) ->
|
||||
Groups = maps:groups_from_list(
|
||||
fun(#banned{who = Who}) -> table(Who) end, Stream
|
||||
),
|
||||
maps:foreach(
|
||||
fun(Tab, Items) ->
|
||||
Trans = fun() ->
|
||||
lists:foreach(
|
||||
fun(Item) ->
|
||||
mnesia:write(Tab, Item, write)
|
||||
end,
|
||||
Items
|
||||
)
|
||||
end,
|
||||
|
||||
case trans(Trans) of
|
||||
{ok, _} ->
|
||||
?SLOG(info, #{
|
||||
msg => "import_banned_from_stream_succeeded",
|
||||
items => Items
|
||||
});
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "import_banned_from_stream_failed",
|
||||
reason => Reason,
|
||||
items => Items
|
||||
})
|
||||
end
|
||||
end,
|
||||
Groups
|
||||
).
|
||||
|
||||
parse_stream(Stream) ->
|
||||
try
|
||||
List = emqx_utils_stream:consume(Stream),
|
||||
parse_stream(List, [], [])
|
||||
catch
|
||||
error:Reason ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
parse_stream([Item | List], Ok, Error) ->
|
||||
maybe
|
||||
{ok, Item1} ?= normalize_parse_item(Item),
|
||||
{ok, Banned} ?= parse(Item1),
|
||||
parse_stream(List, [Banned | Ok], Error)
|
||||
else
|
||||
{error, _} ->
|
||||
parse_stream(List, Ok, [Item | Error])
|
||||
end;
|
||||
parse_stream([], Ok, []) ->
|
||||
{ok, Ok};
|
||||
parse_stream([], Ok, Error) ->
|
||||
?SLOG(warning, #{
|
||||
msg => "invalid_banned_items",
|
||||
items => Error
|
||||
}),
|
||||
{ok, Ok}.
|
||||
|
||||
normalize_parse_item(#{<<"as">> := As} = Item) ->
|
||||
ParseTime = fun(Name, Input) ->
|
||||
maybe
|
||||
#{Name := Time} ?= Input,
|
||||
{ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)),
|
||||
{ok, Input#{Name := Epoch}}
|
||||
else
|
||||
{error, _} = Error ->
|
||||
Error;
|
||||
NoTime when is_map(NoTime) ->
|
||||
{ok, NoTime}
|
||||
end
|
||||
end,
|
||||
|
||||
maybe
|
||||
{ok, Type} ?= emqx_utils:safe_to_existing_atom(As),
|
||||
{ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}),
|
||||
ParseTime(<<"until">>, Item1)
|
||||
end;
|
||||
normalize_parse_item(_Item) ->
|
||||
{error, invalid_item}.
|
||||
|
||||
%%--------------------------------------------------------------------
|
||||
%% gen_server callbacks
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
init([]) ->
|
||||
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
|
||||
{ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}.
|
||||
|
||||
handle_continue(init_from_csv, State) ->
|
||||
File = emqx_schema:naive_env_interpolation(emqx:get_config([banned, bootstrap_file], <<>>)),
|
||||
_ = init_from_csv(File),
|
||||
{noreply, State}.
|
||||
|
||||
handle_call(Req, _From, State) ->
|
||||
?SLOG(error, #{msg => "unexpected_call", call => Req}),
|
||||
|
@ -250,7 +378,7 @@ handle_cast(Msg, State) ->
|
|||
{noreply, State}.
|
||||
|
||||
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
|
||||
_ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [
|
||||
_ = trans(fun ?MODULE:expire_banned_items/1, [
|
||||
erlang:system_time(second)
|
||||
]),
|
||||
{noreply, ensure_expiry_timer(State), hibernate};
|
||||
|
@ -391,3 +519,15 @@ on_banned(_) ->
|
|||
|
||||
all_rules() ->
|
||||
ets:tab2list(?BANNED_RULE_TAB).
|
||||
|
||||
trans(Fun) ->
|
||||
case mria:transaction(?COMMON_SHARD, Fun) of
|
||||
{atomic, Res} -> {ok, Res};
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
||||
trans(Fun, Args) ->
|
||||
case mria:transaction(?COMMON_SHARD, Fun, Args) of
|
||||
{atomic, Res} -> {ok, Res};
|
||||
{aborted, Reason} -> {error, Reason}
|
||||
end.
|
||||
|
|
|
@ -319,6 +319,11 @@ roots(low) ->
|
|||
sc(
|
||||
ref("crl_cache"),
|
||||
#{importance => ?IMPORTANCE_HIDDEN}
|
||||
)},
|
||||
{banned,
|
||||
sc(
|
||||
ref("banned"),
|
||||
#{importance => ?IMPORTANCE_HIDDEN}
|
||||
)}
|
||||
].
|
||||
|
||||
|
@ -1762,6 +1767,17 @@ fields("client_attrs_init") ->
|
|||
desc => ?DESC("client_attrs_init_set_as_attr"),
|
||||
validator => fun restricted_string/1
|
||||
})}
|
||||
];
|
||||
fields("banned") ->
|
||||
[
|
||||
{bootstrap_file,
|
||||
sc(
|
||||
binary(),
|
||||
#{
|
||||
desc => ?DESC("banned_bootstrap_file"),
|
||||
default => <<>>
|
||||
}
|
||||
)}
|
||||
].
|
||||
|
||||
compile_variform(undefined, _Opts) ->
|
||||
|
@ -2105,6 +2121,8 @@ desc(durable_storage) ->
|
|||
?DESC(durable_storage);
|
||||
desc("client_attrs_init") ->
|
||||
?DESC(client_attrs_init);
|
||||
desc("banned") ->
|
||||
"Banned .";
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
as,who,reason,at,until,by
|
||||
clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
||||
username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot
|
||||
usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
|
|
@ -0,0 +1,3 @@
|
|||
as,who,reason,at,until,by
|
||||
clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
||||
username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
|
|
@ -0,0 +1,3 @@
|
|||
as,who,reason,at,until,by
|
||||
clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
||||
username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot
|
|
|
@ -0,0 +1,3 @@
|
|||
as,who,reason,at,until,by
|
||||
clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
|
||||
username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,
|
|
|
@ -0,0 +1,3 @@
|
|||
as,who
|
||||
clientid,c1
|
||||
username,u1
|
|
|
@ -246,6 +246,45 @@ t_session_taken(_) ->
|
|||
{ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic),
|
||||
ok = emqtt:disconnect(C3).
|
||||
|
||||
t_full_bootstrap_file(_) ->
|
||||
emqx_banned:clear(),
|
||||
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))),
|
||||
FullDatas = lists:sort([
|
||||
{banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427},
|
||||
{banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427}
|
||||
]),
|
||||
?assertMatch(FullDatas, lists:sort(get_banned_list())),
|
||||
|
||||
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))),
|
||||
?assertMatch(FullDatas, lists:sort(get_banned_list())),
|
||||
ok.
|
||||
|
||||
t_optional_bootstrap_file(_) ->
|
||||
emqx_banned:clear(),
|
||||
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))),
|
||||
Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
|
||||
?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
|
||||
ok.
|
||||
|
||||
t_omitted_bootstrap_file(_) ->
|
||||
emqx_banned:clear(),
|
||||
?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))),
|
||||
Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]),
|
||||
?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])),
|
||||
ok.
|
||||
|
||||
t_error_bootstrap_file(_) ->
|
||||
emqx_banned:clear(),
|
||||
?assertEqual(
|
||||
{error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>))
|
||||
),
|
||||
?assertEqual(
|
||||
ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>))
|
||||
),
|
||||
Keys = [{clientid, <<"c1">>}],
|
||||
?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]),
|
||||
ok.
|
||||
|
||||
receive_messages(Count) ->
|
||||
receive_messages(Count, []).
|
||||
receive_messages(0, Msgs) ->
|
||||
|
@ -261,3 +300,17 @@ receive_messages(Count, Msgs) ->
|
|||
after 1200 ->
|
||||
Msgs
|
||||
end.
|
||||
|
||||
mk_bootstrap_file(File) ->
|
||||
Dir = code:lib_dir(emqx, test),
|
||||
filename:join([Dir, <<"data/banned">>, File]).
|
||||
|
||||
get_banned_list() ->
|
||||
Tabs = emqx_banned:tables(),
|
||||
lists:foldl(
|
||||
fun(Tab, Acc) ->
|
||||
Acc ++ ets:tab2list(Tab)
|
||||
end,
|
||||
[],
|
||||
Tabs
|
||||
).
|
||||
|
|
|
@ -171,7 +171,7 @@ banned(post, #{body := Body}) ->
|
|||
{error, Reason} ->
|
||||
ErrorReason = io_lib:format("~p", [Reason]),
|
||||
{400, 'BAD_REQUEST', list_to_binary(ErrorReason)};
|
||||
Ban ->
|
||||
{ok, Ban} ->
|
||||
case emqx_banned:create(Ban) of
|
||||
{ok, Banned} ->
|
||||
{200, format(Banned)};
|
||||
|
|
|
@ -45,15 +45,18 @@
|
|||
|
||||
%% Streams from .csv data
|
||||
-export([
|
||||
csv/1
|
||||
csv/1,
|
||||
csv/2
|
||||
]).
|
||||
|
||||
-export_type([stream/1]).
|
||||
-export_type([stream/1, csv_parse_opts/0]).
|
||||
|
||||
%% @doc A stream is essentially a lazy list.
|
||||
-type stream(T) :: fun(() -> next(T) | []).
|
||||
-type next(T) :: nonempty_improper_list(T, stream(T)).
|
||||
|
||||
-type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}.
|
||||
|
||||
-dialyzer(no_improper_lists).
|
||||
|
||||
-elvis([{elvis_style, nesting_level, disable}]).
|
||||
|
@ -261,13 +264,42 @@ ets(Cont, ContF) ->
|
|||
%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once.
|
||||
%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row.
|
||||
-spec csv(binary()) -> stream(map()).
|
||||
csv(Bin) when is_binary(Bin) ->
|
||||
csv(Bin) ->
|
||||
csv(Bin, #{}).
|
||||
|
||||
-spec csv(binary(), csv_parse_opts()) -> stream(map()).
|
||||
csv(Bin, Opts) when is_binary(Bin) ->
|
||||
Liner =
|
||||
case Opts of
|
||||
#{nullable := true} ->
|
||||
fun csv_read_nullable_line/1;
|
||||
_ ->
|
||||
fun csv_read_line/1
|
||||
end,
|
||||
Maper =
|
||||
case Opts of
|
||||
#{filter_null := true} ->
|
||||
fun(Headers, Fields) ->
|
||||
maps:from_list(
|
||||
lists:filter(
|
||||
fun({_, Value}) ->
|
||||
Value =/= undefined
|
||||
end,
|
||||
lists:zip(Headers, Fields)
|
||||
)
|
||||
)
|
||||
end;
|
||||
_ ->
|
||||
fun(Headers, Fields) ->
|
||||
maps:from_list(lists:zip(Headers, Fields))
|
||||
end
|
||||
end,
|
||||
Reader = fun _Iter(Headers, Lines) ->
|
||||
case csv_read_line(Lines) of
|
||||
case Liner(Lines) of
|
||||
{Fields, Rest} ->
|
||||
case length(Fields) == length(Headers) of
|
||||
true ->
|
||||
User = maps:from_list(lists:zip(Headers, Fields)),
|
||||
User = Maper(Headers, Fields),
|
||||
[User | fun() -> _Iter(Headers, Rest) end];
|
||||
false ->
|
||||
error(bad_format)
|
||||
|
@ -291,6 +323,23 @@ csv_read_line([Line | Lines]) ->
|
|||
csv_read_line([]) ->
|
||||
eof.
|
||||
|
||||
csv_read_nullable_line([Line | Lines]) ->
|
||||
%% XXX: not support ' ' for the field value
|
||||
Fields = lists:map(
|
||||
fun(Bin) ->
|
||||
case string:trim(Bin, both) of
|
||||
<<>> ->
|
||||
undefined;
|
||||
Any ->
|
||||
Any
|
||||
end
|
||||
end,
|
||||
binary:split(Line, [<<",">>], [global])
|
||||
),
|
||||
{Fields, Lines};
|
||||
csv_read_nullable_line([]) ->
|
||||
eof.
|
||||
|
||||
do_interleave(_Cont, _, [], []) ->
|
||||
[];
|
||||
do_interleave(Cont, N, [{N, S} | Rest], Rev) ->
|
||||
|
|
|
@ -1630,4 +1630,22 @@ client_attrs_init_set_as_attr {
|
|||
The extracted attribute will be stored in the `client_attrs` property with this name."""
|
||||
}
|
||||
|
||||
banned_bootstrap_file.desc:
|
||||
"""The bootstrap file is a CSV file used to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database.
|
||||
|
||||
The delimiter for this file is `,`.
|
||||
|
||||
The first line of this file must be a header line. All valid headers are listed here:
|
||||
- as :: required
|
||||
- who :: required
|
||||
- by :: optional
|
||||
- reason :: optional
|
||||
- at :: optional
|
||||
- until :: optional
|
||||
|
||||
See the documentation for details on each field.
|
||||
|
||||
Each row in the rest of this file must contain the same number of columns as the header line,
|
||||
and column can be omitted then its value will be `undefined`."""
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue