From 913e0ce18b8ca6a2a75057c85fd0afaef7e9d25f Mon Sep 17 00:00:00 2001 From: firest Date: Mon, 1 Jul 2024 18:36:33 +0800 Subject: [PATCH 1/2] feat(banned): add a bootstrap file for banned --- apps/emqx/src/emqx_banned.erl | 150 +++++++++++++++++- apps/emqx/src/emqx_schema.erl | 18 +++ apps/emqx/test/data/banned/error.csv | 4 + apps/emqx/test/data/banned/full.csv | 3 + apps/emqx/test/data/banned/full2.csv | 3 + apps/emqx/test/data/banned/omitted.csv | 3 + apps/emqx/test/data/banned/optional.csv | 3 + apps/emqx/test/emqx_banned_SUITE.erl | 53 +++++++ .../src/emqx_mgmt_api_banned.erl | 2 +- apps/emqx_utils/src/emqx_utils_stream.erl | 59 ++++++- rel/i18n/emqx_schema.hocon | 18 +++ 11 files changed, 305 insertions(+), 11 deletions(-) create mode 100644 apps/emqx/test/data/banned/error.csv create mode 100644 apps/emqx/test/data/banned/full.csv create mode 100644 apps/emqx/test/data/banned/full2.csv create mode 100644 apps/emqx/test/data/banned/omitted.csv create mode 100644 apps/emqx/test/data/banned/optional.csv diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index f6e9a908d..5e3545b93 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -16,6 +16,8 @@ -module(emqx_banned). +-feature(maybe_expr, enable). + -behaviour(gen_server). -behaviour(emqx_db_backup). @@ -48,6 +50,7 @@ handle_call/3, handle_cast/2, handle_info/2, + handle_continue/2, terminate/2, code_change/3 ]). @@ -132,7 +135,7 @@ format(#banned{ until => to_rfc3339(Until) }. --spec parse(map()) -> emqx_types:banned() | {error, term()}. +-spec parse(map()) -> {ok, emqx_types:banned()} | {error, term()}. parse(Params) -> case parse_who(Params) of {error, Reason} -> @@ -144,13 +147,13 @@ parse(Params) -> Until = maps:get(<<"until">>, Params, At + ?EXPIRATION_TIME), case Until > erlang:system_time(second) of true -> - #banned{ + {ok, #banned{ who = Who, by = By, reason = Reason, at = At, until = Until - }; + }}; false -> ErrorReason = io_lib:format("Cannot create expired banned, ~p to ~p", [At, Until]), @@ -234,12 +237,137 @@ who(peerhost_net, CIDR) when is_tuple(CIDR) -> {peerhost_net, CIDR}; who(peerhost_net, CIDR) when is_binary(CIDR) -> {peerhost_net, esockd_cidr:parse(binary_to_list(CIDR), true)}. +%%-------------------------------------------------------------------- +%% Import From CSV +%%-------------------------------------------------------------------- +init_from_csv(<<>>) -> + ok; +init_from_csv(File) -> + maybe + core ?= mria_rlog:role(), + '$end_of_table' ?= mnesia:dirty_first(?BANNED_RULE_TAB), + '$end_of_table' ?= mnesia:dirty_first(?BANNED_INDIVIDUAL_TAB), + {ok, Bin} ?= file:read_file(File), + Stream = emqx_utils_stream:csv(Bin, #{nullable => true, filter_null => true}), + {ok, List} ?= parse_stream(Stream), + import_from_stream(List), + ?SLOG(info, #{ + msg => "load_banned_bootstrap_file_succeeded", + file => File + }) + else + replicant -> + ok; + {Name, _} when + Name == peerhost; + Name == peerhost_net; + Name == clientid_re; + Name == username_re; + Name == clientid; + Name == username + -> + ok; + {error, Reason} = Error -> + ?SLOG(error, #{ + msg => "load_banned_bootstrap_file_failed", + reason => Reason, + file => File + }), + Error + end. + +import_from_stream(Stream) -> + Groups = maps:groups_from_list( + fun(#banned{who = Who}) -> table(Who) end, Stream + ), + maps:foreach( + fun(Tab, Items) -> + Trans = fun() -> + lists:foreach( + fun(Item) -> + mnesia:write(Tab, Item, write) + end, + Items + ) + end, + + case trans(Trans) of + {ok, _} -> + ?SLOG(info, #{ + msg => "import_banned_from_stream_succeeded", + items => Items + }); + {error, Reason} -> + ?SLOG(error, #{ + msg => "import_banned_from_stream_failed", + reason => Reason, + items => Items + }) + end + end, + Groups + ). + +parse_stream(Stream) -> + try + List = emqx_utils_stream:consume(Stream), + parse_stream(List, [], []) + catch + error:Reason -> + {error, Reason} + end. + +parse_stream([Item | List], Ok, Error) -> + maybe + {ok, Item1} ?= normalize_parse_item(Item), + {ok, Banned} ?= parse(Item1), + parse_stream(List, [Banned | Ok], Error) + else + {error, _} -> + parse_stream(List, Ok, [Item | Error]) + end; +parse_stream([], Ok, []) -> + {ok, Ok}; +parse_stream([], Ok, Error) -> + ?SLOG(warning, #{ + msg => "invalid_banned_items", + items => Error + }), + {ok, Ok}. + +normalize_parse_item(#{<<"as">> := As} = Item) -> + ParseTime = fun(Name, Input) -> + maybe + #{Name := Time} ?= Input, + {ok, Epoch} ?= emqx_utils_calendar:to_epoch_second(emqx_utils_conv:str(Time)), + {ok, Input#{Name := Epoch}} + else + {error, _} = Error -> + Error; + NoTime when is_map(NoTime) -> + {ok, NoTime} + end + end, + + maybe + {ok, Type} ?= emqx_utils:safe_to_existing_atom(As), + {ok, Item1} ?= ParseTime(<<"at">>, Item#{<<"as">> := Type}), + ParseTime(<<"until">>, Item1) + end; +normalize_parse_item(_Item) -> + {error, invalid_item}. + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - {ok, ensure_expiry_timer(#{expiry_timer => undefined})}. + {ok, ensure_expiry_timer(#{expiry_timer => undefined}), {continue, init_from_csv}}. + +handle_continue(init_from_csv, State) -> + File = emqx_schema:naive_env_interpolation(emqx:get_config([banned, bootstrap_file], <<>>)), + _ = init_from_csv(File), + {noreply, State}. handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), @@ -250,7 +378,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) -> - _ = mria:transaction(?COMMON_SHARD, fun ?MODULE:expire_banned_items/1, [ + _ = trans(fun ?MODULE:expire_banned_items/1, [ erlang:system_time(second) ]), {noreply, ensure_expiry_timer(State), hibernate}; @@ -391,3 +519,15 @@ on_banned(_) -> all_rules() -> ets:tab2list(?BANNED_RULE_TAB). + +trans(Fun) -> + case mria:transaction(?COMMON_SHARD, Fun) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, Reason} + end. + +trans(Fun, Args) -> + case mria:transaction(?COMMON_SHARD, Fun, Args) of + {atomic, Res} -> {ok, Res}; + {aborted, Reason} -> {error, Reason} + end. diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index 6b02a4d4b..1ba71fb00 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -319,6 +319,11 @@ roots(low) -> sc( ref("crl_cache"), #{importance => ?IMPORTANCE_HIDDEN} + )}, + {banned, + sc( + ref("banned"), + #{importance => ?IMPORTANCE_HIDDEN} )} ]. @@ -1762,6 +1767,17 @@ fields("client_attrs_init") -> desc => ?DESC("client_attrs_init_set_as_attr"), validator => fun restricted_string/1 })} + ]; +fields("banned") -> + [ + {bootstrap_file, + sc( + binary(), + #{ + desc => ?DESC("banned_bootstrap_file"), + default => <<>> + } + )} ]. compile_variform(undefined, _Opts) -> @@ -2105,6 +2121,8 @@ desc(durable_storage) -> ?DESC(durable_storage); desc("client_attrs_init") -> ?DESC(client_attrs_init); +desc("banned") -> + "Banned ."; desc(_) -> undefined. diff --git a/apps/emqx/test/data/banned/error.csv b/apps/emqx/test/data/banned/error.csv new file mode 100644 index 000000000..49046a5a3 --- /dev/null +++ b/apps/emqx/test/data/banned/error.csv @@ -0,0 +1,4 @@ +as,who,reason,at,until,by +clientid,c1,right,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u1,reason 1,abc,2025-10-25T21:53:47+08:00,boot +usernamx,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/full.csv b/apps/emqx/test/data/banned/full.csv new file mode 100644 index 000000000..f6abf9de2 --- /dev/null +++ b/apps/emqx/test/data/banned/full.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c1,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u1,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/full2.csv b/apps/emqx/test/data/banned/full2.csv new file mode 100644 index 000000000..e033d2e60 --- /dev/null +++ b/apps/emqx/test/data/banned/full2.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c2,reason 1,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot +username,u2,reason 2,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00,boot diff --git a/apps/emqx/test/data/banned/omitted.csv b/apps/emqx/test/data/banned/omitted.csv new file mode 100644 index 000000000..5db11d7cf --- /dev/null +++ b/apps/emqx/test/data/banned/omitted.csv @@ -0,0 +1,3 @@ +as,who,reason,at,until,by +clientid,c1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00, +username,u1,,2021-10-25T21:53:47+08:00,2025-10-25T21:53:47+08:00, diff --git a/apps/emqx/test/data/banned/optional.csv b/apps/emqx/test/data/banned/optional.csv new file mode 100644 index 000000000..4e752e29a --- /dev/null +++ b/apps/emqx/test/data/banned/optional.csv @@ -0,0 +1,3 @@ +as,who +clientid,c1 +username,u1 diff --git a/apps/emqx/test/emqx_banned_SUITE.erl b/apps/emqx/test/emqx_banned_SUITE.erl index d73907c57..d2d031124 100644 --- a/apps/emqx/test/emqx_banned_SUITE.erl +++ b/apps/emqx/test/emqx_banned_SUITE.erl @@ -246,6 +246,45 @@ t_session_taken(_) -> {ok, #{}, [0]} = emqtt:unsubscribe(C3, Topic), ok = emqtt:disconnect(C3). +t_full_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full.csv">>))), + FullDatas = lists:sort([ + {banned, {username, <<"u1">>}, <<"boot">>, <<"reason 2">>, 1635170027, 1761400427}, + {banned, {clientid, <<"c1">>}, <<"boot">>, <<"reason 1">>, 1635170027, 1761400427} + ]), + ?assertMatch(FullDatas, lists:sort(get_banned_list())), + + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"full2.csv">>))), + ?assertMatch(FullDatas, lists:sort(get_banned_list())), + ok. + +t_optional_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"optional.csv">>))), + Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]), + ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])), + ok. + +t_omitted_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual(ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"omitted.csv">>))), + Keys = lists:sort([{username, <<"u1">>}, {clientid, <<"c1">>}]), + ?assertMatch(Keys, lists:sort([element(2, Data) || Data <- get_banned_list()])), + ok. + +t_error_bootstrap_file(_) -> + emqx_banned:clear(), + ?assertEqual( + {error, enoent}, emqx_banned:init_from_csv(mk_bootstrap_file(<<"not_exists.csv">>)) + ), + ?assertEqual( + ok, emqx_banned:init_from_csv(mk_bootstrap_file(<<"error.csv">>)) + ), + Keys = [{clientid, <<"c1">>}], + ?assertMatch(Keys, [element(2, Data) || Data <- get_banned_list()]), + ok. + receive_messages(Count) -> receive_messages(Count, []). receive_messages(0, Msgs) -> @@ -261,3 +300,17 @@ receive_messages(Count, Msgs) -> after 1200 -> Msgs end. + +mk_bootstrap_file(File) -> + Dir = code:lib_dir(emqx, test), + filename:join([Dir, <<"data/banned">>, File]). + +get_banned_list() -> + Tabs = emqx_banned:tables(), + lists:foldl( + fun(Tab, Acc) -> + Acc ++ ets:tab2list(Tab) + end, + [], + Tabs + ). diff --git a/apps/emqx_management/src/emqx_mgmt_api_banned.erl b/apps/emqx_management/src/emqx_mgmt_api_banned.erl index 0a24dab15..659eed48a 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_banned.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_banned.erl @@ -171,7 +171,7 @@ banned(post, #{body := Body}) -> {error, Reason} -> ErrorReason = io_lib:format("~p", [Reason]), {400, 'BAD_REQUEST', list_to_binary(ErrorReason)}; - Ban -> + {ok, Ban} -> case emqx_banned:create(Ban) of {ok, Banned} -> {200, format(Banned)}; diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 8b6db8292..6e3ff079b 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -45,15 +45,18 @@ %% Streams from .csv data -export([ - csv/1 + csv/1, + csv/2 ]). --export_type([stream/1]). +-export_type([stream/1, csv_parse_opts/0]). %% @doc A stream is essentially a lazy list. -type stream(T) :: fun(() -> next(T) | []). -type next(T) :: nonempty_improper_list(T, stream(T)). +-type csv_parse_opts() :: #{nullable => boolean(), filter_null => boolean()}. + -dialyzer(no_improper_lists). -elvis([{elvis_style, nesting_level, disable}]). @@ -261,13 +264,42 @@ ets(Cont, ContF) -> %% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once. %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. -spec csv(binary()) -> stream(map()). -csv(Bin) when is_binary(Bin) -> +csv(Bin) -> + csv(Bin, #{}). + +-spec csv(binary(), csv_parse_opts()) -> stream(map()). +csv(Bin, Opts) when is_binary(Bin) -> + Liner = + case Opts of + #{nullable := true} -> + fun csv_read_nullable_line/1; + _ -> + fun csv_read_line/1 + end, + Maper = + case Opts of + #{filter_null := true} -> + fun(Headers, Fields) -> + maps:from_list( + lists:filter( + fun({_, Value}) -> + Value =/= undefined + end, + lists:zip(Headers, Fields) + ) + ) + end; + _ -> + fun(Headers, Fields) -> + maps:from_list(lists:zip(Headers, Fields)) + end + end, Reader = fun _Iter(Headers, Lines) -> - case csv_read_line(Lines) of + case Liner(Lines) of {Fields, Rest} -> case length(Fields) == length(Headers) of true -> - User = maps:from_list(lists:zip(Headers, Fields)), + User = Maper(Headers, Fields), [User | fun() -> _Iter(Headers, Rest) end]; false -> error(bad_format) @@ -291,6 +323,23 @@ csv_read_line([Line | Lines]) -> csv_read_line([]) -> eof. +csv_read_nullable_line([Line | Lines]) -> + %% XXX: not support ' ' for the field value + Fields = lists:map( + fun(Bin) -> + case string:trim(Bin, both) of + <<>> -> + undefined; + Any -> + Any + end + end, + binary:split(Line, [<<",">>], [global]) + ), + {Fields, Lines}; +csv_read_nullable_line([]) -> + eof. + do_interleave(_Cont, _, [], []) -> []; do_interleave(Cont, N, [{N, S} | Rest], Rev) -> diff --git a/rel/i18n/emqx_schema.hocon b/rel/i18n/emqx_schema.hocon index f9978fe6f..cea151a87 100644 --- a/rel/i18n/emqx_schema.hocon +++ b/rel/i18n/emqx_schema.hocon @@ -1630,4 +1630,22 @@ client_attrs_init_set_as_attr { The extracted attribute will be stored in the `client_attrs` property with this name.""" } +banned_bootstrap_file.desc: +"""The bootstrap file is a CSV file used to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database. + +The delimiter for this file is `,`. + +The first line of this file must be a header line. All valid headers are listed here: +- as :: required +- who :: required +- by :: optional +- reason :: optional +- at :: optional +- until :: optional + +See the documentation for details on each field. + +Each row in the rest of this file must contain the same number of columns as the header line, +and column can be omitted then its value will be `undefined`.""" + } From a912751458d6a3e7f53a551bf3f8186a1c90c3f5 Mon Sep 17 00:00:00 2001 From: firest Date: Tue, 2 Jul 2024 13:47:33 +0800 Subject: [PATCH 2/2] chore: update changes --- changes/ee/feat-13386.en.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 changes/ee/feat-13386.en.md diff --git a/changes/ee/feat-13386.en.md b/changes/ee/feat-13386.en.md new file mode 100644 index 000000000..e038e0144 --- /dev/null +++ b/changes/ee/feat-13386.en.md @@ -0,0 +1,17 @@ +Added a bootstrap file to batch loading banned data when initializing a single node or cluster, in other words, the import operation is performed only if there is no data in the database. + + +This file is a CSV file with `,` as its delimiter. + +The first line of this file must be a header line. All valid headers are listed here: +- as :: required +- who :: required +- by :: optional +- reason :: optional +- at :: optional +- until :: optional + +See the documentation for details on each field. + +Each row in the rest of this file must contain the same number of columns as the header line, +and column can be omitted then its value will be `undefined`.