diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index 6dc893043..b4fdd0e86 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -85,6 +85,16 @@ %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). +-define(cast_or_eval(Pid, Msg, Expr), + case Pid =:= self() of + true -> + _ = Expr, + ok; + false -> + cast(Pid, Msg) + end +). + -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> ok = create_tabs(), @@ -159,15 +169,7 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% https://emqx.atlassian.net/browse/EMQX-10214 I = emqx_broker_helper:get_sub_shard(SubPid, Topic), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}), + Sync = call(pick({Topic, I}), {subscribe, Topic, SubPid, I}), case Sync of ok -> ok; @@ -218,15 +220,7 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); _ -> ok end, - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); + cast(pick({Topic, I}), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -503,8 +497,8 @@ cast(Broker, Req) -> gen_server:cast(Broker, Req). %% Pick a broker -pick(Topic) -> - gproc_pool:pick_worker(broker_pool, Topic). +pick(TopicShard) -> + gproc_pool:pick_worker(broker_pool, TopicShard). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -516,36 +510,72 @@ init([Pool, Id]) -> handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), + Result = maybe_add_route(Existed, Topic, From), + assert_ok_result(Result), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - Result = maybe_add_route(Existed, Topic, From), - {reply, Result, State}; -handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) -> - Existed = ets:member(?SUBSCRIBER, Topic), - true = ets:insert(?SUBSCRIBER, [ - {Topic, {shard, I}}, - {{shard, Topic, I}, SubPid} - ]), - Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, {shard, Topic, I}), + Recs = [{{shard, Topic, I}, SubPid}], + Recs1 = + case Existed of + false -> + %% This will attempt to add a route per each new shard. + %% The overhead must be negligible, but the consistency in general + %% and race conditions safety is expected to be stronger. + %% The main purpose is to solve the race when + %% `{shard, Topic, N}` (where N > 0) + %% is the first ever processed subscribe request per `Topic`. + %% It inserts `{Topic, {shard, I}}` to `?SUBSCRIBER` tab. + %% After that, another broker worker starts processing + %% `{shard, Topic, 0}` sub and already observers `{shard, Topic, N}`, + %% i.e. `ets:member(?SUBSCRIBER, Topic)` returns false, + %% so it doesn't add the route. + %% Even if this happens, this cast is expected to be processed eventually + %% and the route should be added (unless the worker restarts...) + ?cast_or_eval( + pick({Topic, 0}), + {subscribed, Topic, shard, I}, + sync_route(add, Topic, #{}) + ), + [{Topic, {shard, I}} | Recs]; + true -> + Recs + end, + true = ets:insert(?SUBSCRIBER, Recs1), + {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. +handle_cast({subscribed, Topic, shard, _I}, State) -> + %% Do not need to 'maybe add' (i.e. to check if the route exists). + %% It was already checked that this shard is newely added. + _ = sync_route(add, Topic, #{}), + {noreply, State}; +handle_cast({unsubscribed, Topic, shard, _I}, State) -> + _ = maybe_delete_route(Topic), + {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), + _ = maybe_delete_route(Topic), {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, I}, State) -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + %% Do not attempt to delete any routes here, + %% let it be handled only by the same pool worker per topic (0 shard), + %% so that all route deletes are serialized. + ?cast_or_eval( + pick({Topic, 0}), + {unsubscribed, Topic, shard, I}, + maybe_delete_route(Topic) + ); true -> - true + ok end, - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -584,7 +614,7 @@ do_dispatch(Topic, #delivery{message = Msg}) -> {ok, DispN} end. -%% Donot dispatch to share subscriber here. +%% Don't dispatch to share subscriber here. %% we do it in `emqx_shared_sub.erl` with configured strategy do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of @@ -605,15 +635,19 @@ do_dispatch({shard, I}, Topic, Msg) -> %% +assert_ok_result(ok) -> ok; +assert_ok_result(Ref) when is_reference(Ref) -> ok. + maybe_add_route(_Existed = false, Topic, ReplyTo) -> sync_route(add, Topic, #{reply => ReplyTo}); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. -maybe_delete_route(_Exists = false, Topic) -> - sync_route(delete, Topic, #{}); -maybe_delete_route(_Exists = true, _Topic) -> - ok. +maybe_delete_route(Topic) -> + case ets:member(?SUBSCRIBER, Topic) of + true -> ok; + false -> sync_route(delete, Topic, #{}) + end. sync_route(Action, Topic, ReplyTo) -> EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index dccc681c8..fef5a1fc7 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -97,7 +97,7 @@ push(Action, Topic, Dest, Opts) -> Context = mk_push_context(Opts), _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of - {MRef, _} -> + [{MRef, _}] -> MRef; [] -> ok @@ -128,7 +128,7 @@ designate_prio(delete, #{}) -> mk_push_context(#{reply := To}) -> MRef = erlang:make_ref(), - {MRef, To}; + [{MRef, To}]; mk_push_context(_) -> []. @@ -272,8 +272,8 @@ send_replies(Errors, Batch) -> replyctx_send(_Result, []) -> noreply; -replyctx_send(Result, {MRef, Pid}) -> - _ = erlang:send(Pid, {MRef, Result}), +replyctx_send(Result, RefsPids) -> + _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids), ok. %% @@ -316,10 +316,11 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> Stash#{Route := RouteOpMerged} end. -merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> - %% NOTE: This should not happen anyway. - _ = replyctx_send(ignored, Ctx1), - DestOp; +merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) -> + %% NOTE: This can happen as topic shard can be processed concurrently + %% by different broker worker, see emqx_broker for more details. + MergedCtx = Ctx1 ++ Ctx2, + ?ROUTEOP(Action, Prio2, MergedCtx); merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) -> %% NOTE: Latter cancel the former. %% Strictly speaking, in ideal conditions we could just cancel both, because they @@ -352,7 +353,7 @@ stash_stats(Stash) -> batch_test() -> Dest = node(), - Ctx = fun(N) -> {N, self()} end, + Ctx = fun(N) -> [{N, self()}] end, Stash = stash_add( [ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))), @@ -375,6 +376,7 @@ batch_test() -> stash_new() ), {Batch, StashLeft} = mk_batch(Stash, 5), + ?assertMatch( #{ {<<"t/1">>, Dest} := {add, ?PRIO_LO, _}, @@ -392,16 +394,16 @@ batch_test() -> }, StashLeft ), + + %% Replies are only sent to superseded ops: ?assertEqual( [ - {2, ignored}, {1, ok}, {5, ok}, - {7, ignored}, {4, ok}, {9, ok}, + {7, ok}, {8, ok}, - {13, ignored}, {11, ok} ], emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index 0d668bc20..f79afab29 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -310,7 +310,11 @@ reorder_authenticator(_ChainName, []) -> reorder_authenticator(ChainName, AuthenticatorIDs) -> call({reorder_authenticator, ChainName, AuthenticatorIDs}). --spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) -> +-spec import_users( + chain_name(), + authenticator_id(), + {plain | hash, prepared_user_list | binary(), binary()} +) -> ok | {error, term()}. import_users(ChainName, AuthenticatorID, Filename) -> call({import_users, ChainName, AuthenticatorID, Filename}). diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl index 3898d0bc4..897f6a288 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl @@ -53,11 +53,14 @@ when when State :: state(). --callback import_users({Filename, FileData}, State) -> +-callback import_users({PasswordType, Filename, FileData}, State) -> ok | {error, term()} when - Filename :: binary(), FileData :: binary(), State :: state(). + PasswordType :: plain | hash, + Filename :: prepared_user_list | binary(), + FileData :: binary(), + State :: state(). -callback add_user(UserInfo, State) -> {ok, User} diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl index bac313195..2b2ccecac 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl @@ -58,8 +58,8 @@ schema("/authentication/:id/import_users") -> post => #{ tags => ?API_TAGS_GLOBAL, description => ?DESC(authentication_id_import_users_post), - parameters => [emqx_authn_api:param_auth_id()], - 'requestBody' => emqx_dashboard_swagger:file_schema(filename), + parameters => [emqx_authn_api:param_auth_id(), param_password_type()], + 'requestBody' => request_body_schema(), responses => #{ 204 => <<"Users imported">>, 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), @@ -74,8 +74,12 @@ schema("/listeners/:listener_id/authentication/:id/import_users") -> tags => ?API_TAGS_SINGLE, deprecated => true, description => ?DESC(listeners_listener_id_authentication_id_import_users_post), - parameters => [emqx_authn_api:param_listener_id(), emqx_authn_api:param_auth_id()], - 'requestBody' => emqx_dashboard_swagger:file_schema(filename), + parameters => [ + emqx_authn_api:param_listener_id(), + emqx_authn_api:param_auth_id(), + param_password_type() + ], + 'requestBody' => request_body_schema(), responses => #{ 204 => <<"Users imported">>, 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), @@ -84,37 +88,124 @@ schema("/listeners/:listener_id/authentication/:id/import_users") -> } }. +request_body_schema() -> + #{content := Content} = emqx_dashboard_swagger:file_schema(filename), + Content1 = + Content#{ + <<"application/json">> => #{ + schema => #{ + type => object, + example => [ + #{ + <<"user_id">> => <<"user1">>, + <<"password">> => <<"password1">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"user2">>, + <<"password">> => <<"password2">>, + <<"is_superuser">> => false + } + ] + } + } + }, + #{ + content => Content1, + description => <<"Import body">> + }. + authenticator_import_users( post, - #{ + Req = #{ bindings := #{id := AuthenticatorID}, - body := #{<<"filename">> := #{type := _} = File} + headers := Headers, + body := Body } ) -> - [{FileName, FileData}] = maps:to_list(maps:without([type], File)), - case emqx_authn_chains:import_users(?GLOBAL, AuthenticatorID, {FileName, FileData}) of + PasswordType = password_type(Req), + Result = + case maps:get(<<"content-type">>, Headers, undefined) of + <<"application/json">> -> + emqx_authn_chains:import_users( + ?GLOBAL, AuthenticatorID, {PasswordType, prepared_user_list, Body} + ); + _ -> + case Body of + #{<<"filename">> := #{type := _} = File} -> + [{Name, Data}] = maps:to_list(maps:without([type], File)), + emqx_authn_chains:import_users( + ?GLOBAL, AuthenticatorID, {PasswordType, Name, Data} + ); + _ -> + {error, {missing_parameter, filename}} + end + end, + case Result of ok -> {204}; {error, Reason} -> emqx_authn_api:serialize_error(Reason) - end; -authenticator_import_users(post, #{bindings := #{id := _}, body := _}) -> - emqx_authn_api:serialize_error({missing_parameter, filename}). + end. listener_authenticator_import_users( post, - #{ + Req = #{ bindings := #{listener_id := ListenerID, id := AuthenticatorID}, - body := #{<<"filename">> := #{type := _} = File} + headers := Headers, + body := Body } ) -> - [{FileName, FileData}] = maps:to_list(maps:without([type], File)), - emqx_authn_api:with_chain( - ListenerID, - fun(ChainName) -> - case emqx_authn_chains:import_users(ChainName, AuthenticatorID, {FileName, FileData}) of - ok -> {204}; - {error, Reason} -> emqx_authn_api:serialize_error(Reason) + PasswordType = password_type(Req), + + DoImport = fun(FileName, FileData) -> + emqx_authn_api:with_chain( + ListenerID, + fun(ChainName) -> + case + emqx_authn_chains:import_users( + ChainName, AuthenticatorID, {PasswordType, FileName, FileData} + ) + of + ok -> {204}; + {error, Reason} -> emqx_authn_api:serialize_error(Reason) + end end - end - ); -listener_authenticator_import_users(post, #{bindings := #{listener_id := _, id := _}, body := _}) -> - emqx_authn_api:serialize_error({missing_parameter, filename}). + ) + end, + case maps:get(<<"content-type">>, Headers, undefined) of + <<"application/json">> -> + DoImport(prepared_user_list, Body); + _ -> + case Body of + #{<<"filename">> := #{type := _} = File} -> + [{Name, Data}] = maps:to_list(maps:without([type], File)), + DoImport(Name, Data); + _ -> + emqx_authn_api:serialize_error({missing_parameter, filename}) + end + end. + +%%-------------------------------------------------------------------- +%% helpers + +param_password_type() -> + {type, + hoconsc:mk( + binary(), + #{ + in => query, + enum => [<<"plain">>, <<"hash">>], + required => true, + desc => << + "The import file template type, enum with `plain`," + "`hash`" + >>, + example => <<"hash">> + } + )}. + +password_type(_Req = #{query_string := #{<<"type">> := <<"plain">>}}) -> + plain; +password_type(_Req = #{query_string := #{<<"type">> := <<"hash">>}}) -> + hash; +password_type(_) -> + hash. diff --git a/apps/emqx_auth/test/data/user-credentials-plain-dup.csv b/apps/emqx_auth/test/data/user-credentials-plain-dup.csv new file mode 100644 index 000000000..9c263c50b --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain-dup.csv @@ -0,0 +1,3 @@ +user_id,password,is_superuser +myuser3,password3,true +myuser3,password4,false diff --git a/apps/emqx_auth/test/data/user-credentials-plain-dup.json b/apps/emqx_auth/test/data/user-credentials-plain-dup.json new file mode 100644 index 000000000..0bda977c6 --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain-dup.json @@ -0,0 +1,12 @@ +[ + { + "user_id":"myuser1", + "password":"password1", + "is_superuser": true + }, + { + "user_id":"myuser1", + "password":"password2", + "is_superuser": false + } +] diff --git a/apps/emqx_auth/test/data/user-credentials-plain.csv b/apps/emqx_auth/test/data/user-credentials-plain.csv new file mode 100644 index 000000000..223be0bf9 --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain.csv @@ -0,0 +1,3 @@ +user_id,password,is_superuser +myuser3,password3,true +myuser4,password4,false diff --git a/apps/emqx_auth/test/data/user-credentials-plain.json b/apps/emqx_auth/test/data/user-credentials-plain.json new file mode 100644 index 000000000..ec5e1a82f --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain.json @@ -0,0 +1,12 @@ +[ + { + "user_id":"myuser1", + "password":"password1", + "is_superuser": true + }, + { + "user_id":"myuser2", + "password":"password2", + "is_superuser": false + } +] diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 3dbdf6625..28b8d7535 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mnesia, [ {description, "EMQX Buitl-in Database Authentication and Authorization"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_auth_mnesia_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index bbbaeddb1..8cbd8f35e 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -52,9 +52,7 @@ do_destroy/1, do_add_user/1, do_delete_user/2, - do_update_user/3, - import/2, - import_csv/3 + do_update_user/3 ]). -export([mnesia/1, init_tables/0]). @@ -173,20 +171,67 @@ do_destroy(UserGroup) -> mnesia:select(?TAB, group_match_spec(UserGroup), write) ). -import_users({Filename0, FileData}, State) -> - Filename = to_binary(Filename0), - case filename:extension(Filename) of - <<".json">> -> - import_users_from_json(FileData, State); - <<".csv">> -> - CSV = csv_data(FileData), - import_users_from_csv(CSV, State); - <<>> -> - {error, unknown_file_format}; - Extension -> - {error, {unsupported_file_format, Extension}} +import_users({PasswordType, Filename, FileData}, State) -> + Convertor = convertor(PasswordType, State), + try + {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor), + case length(Users) > 0 andalso do_import_users(Users) of + false -> + error(empty_users); + ok -> + ok; + {error, Reason} -> + _ = do_clean_imported_users(Users), + error(Reason) + end + catch + error:Reason1:Stk -> + ?SLOG( + warning, + #{ + msg => "import_users_failed", + reason => Reason1, + type => PasswordType, + filename => Filename, + stacktrace => Stk + } + ), + {error, Reason1} end. +do_import_users(Users) -> + trans( + fun() -> + lists:foreach( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID, + <<"password_hash">> := PasswordHash, + <<"salt">> := Salt, + <<"is_superuser">> := IsSuperuser + } + ) -> + insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) + end, + Users + ) + end + ). + +do_clean_imported_users(Users) -> + lists:foreach( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID + } + ) -> + mria:dirty_delete(?TAB, {UserGroup, UserID}) + end, + Users + ). + add_user( UserInfo, State @@ -293,93 +338,6 @@ run_fuzzy_filter( %% Internal functions %%------------------------------------------------------------------------------ -%% Example: data/user-credentials.json -import_users_from_json(Bin, #{user_group := UserGroup}) -> - case emqx_utils_json:safe_decode(Bin, [return_maps]) of - {ok, List} -> - trans(fun ?MODULE:import/2, [UserGroup, List]); - {error, Reason} -> - {error, Reason} - end. - -%% Example: data/user-credentials.csv -import_users_from_csv(CSV, #{user_group := UserGroup}) -> - case get_csv_header(CSV) of - {ok, Seq, NewCSV} -> - trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]); - {error, Reason} -> - {error, Reason} - end. - -import(_UserGroup, []) -> - ok; -import(UserGroup, [ - #{ - <<"user_id">> := UserID, - <<"password_hash">> := PasswordHash - } = UserInfo - | More -]) when - is_binary(UserID) andalso is_binary(PasswordHash) --> - Salt = maps:get(<<"salt">>, UserInfo, <<>>), - IsSuperuser = maps:get(<<"is_superuser">>, UserInfo, false), - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), - import(UserGroup, More); -import(_UserGroup, [_ | _More]) -> - {error, bad_format}. - -%% Importing 5w users needs 1.7 seconds -import_csv(UserGroup, CSV, Seq) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - case get_user_info_by_seq(Fields, Seq) of - {ok, - #{ - user_id := UserID, - password_hash := PasswordHash - } = UserInfo} -> - Salt = maps:get(salt, UserInfo, <<>>), - IsSuperuser = maps:get(is_superuser, UserInfo, false), - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), - import_csv(UserGroup, NewCSV, Seq); - {error, Reason} -> - {error, Reason} - end; - eof -> - ok - end. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - {error, empty_file} - end. - -get_user_info_by_seq(Fields, Seq) -> - get_user_info_by_seq(Fields, Seq, #{}). - -get_user_info_by_seq([], [], #{user_id := _, password_hash := _} = Acc) -> - {ok, Acc}; -get_user_info_by_seq(_, [], _) -> - {error, bad_format}; -get_user_info_by_seq([UserID | More1], [<<"user_id">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{user_id => UserID}); -get_user_info_by_seq([PasswordHash | More1], [<<"password_hash">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{password_hash => PasswordHash}); -get_user_info_by_seq([Salt | More1], [<<"salt">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{salt => Salt}); -get_user_info_by_seq([<<"true">> | More1], [<<"is_superuser">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{is_superuser => true}); -get_user_info_by_seq([<<"false">> | More1], [<<"is_superuser">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{is_superuser => false}); -get_user_info_by_seq(_, _, _) -> - {error, bad_format}. - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) -> UserInfoRecord = user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), insert_user(UserInfoRecord). @@ -449,6 +407,12 @@ trans(Fun, Args) -> {aborted, Reason} -> {error, Reason} end. +trans(Fun) -> + case mria:transaction(?AUTHN_SHARD, Fun) of + {atomic, Res} -> Res; + {aborted, Reason} -> {error, Reason} + end. + to_binary(B) when is_binary(B) -> B; to_binary(L) when is_list(L) -> @@ -482,11 +446,91 @@ group_match_spec(UserGroup, QString) -> end) end. -csv_data(Data) -> - Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), - {csv_data, Lines}. +%%-------------------------------------------------------------------- +%% parse import file/data -csv_read_line({csv_data, [Line | Lines]}) -> - {ok, Line, {csv_data, Lines}}; -csv_read_line({csv_data, []}) -> - eof. +parse_import_users(Filename, FileData, Convertor) -> + Eval = fun _Eval(F) -> + case F() of + [] -> []; + [User | F1] -> [Convertor(User) | _Eval(F1)] + end + end, + ReaderFn = reader_fn(Filename, FileData), + Users = Eval(ReaderFn), + NewUsersCount = + lists:foldl( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID + }, + Acc + ) -> + case ets:member(?TAB, {UserGroup, UserID}) of + true -> + Acc; + false -> + Acc + 1 + end + end, + 0, + Users + ), + {NewUsersCount, Users}. + +reader_fn(prepared_user_list, List) when is_list(List) -> + %% Example: [#{<<"user_id">> => <<>>, ...}] + emqx_utils_stream:list(List); +reader_fn(Filename0, Data) -> + case filename:extension(to_binary(Filename0)) of + <<".json">> -> + %% Example: data/user-credentials.json + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, List} when is_list(List) -> + emqx_utils_stream:list(List); + {ok, _} -> + error(unknown_file_format); + {error, Reason} -> + error(Reason) + end; + <<".csv">> -> + %% Example: data/user-credentials.csv + emqx_utils_stream:csv(Data); + <<>> -> + error(unknown_file_format); + Extension -> + error({unsupported_file_format, Extension}) + end. + +convertor(PasswordType, State) -> + fun(User) -> + convert_user(User, PasswordType, State) + end. + +convert_user( + User = #{<<"user_id">> := UserId}, + PasswordType, + #{user_group := UserGroup, password_hash_algorithm := Algorithm} +) -> + {PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm), + #{ + <<"user_id">> => UserId, + <<"password_hash">> => PasswordHash, + <<"salt">> => Salt, + <<"is_superuser">> => is_superuser(User), + <<"user_group">> => UserGroup + }; +convert_user(_, _, _) -> + error(bad_format). + +find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) -> + {PasswordHash, maps:get(<<"salt">>, User, <<>>)}; +find_password_hash(plain, #{<<"password">> := Password}, Algorithm) -> + emqx_authn_password_hashing:hash(Algorithm, Password); +find_password_hash(_, _, _) -> + error(bad_format). + +is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true; +is_superuser(#{<<"is_superuser">> := true}) -> true; +is_superuser(_) -> false. diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl index 2035cf2fa..71da75c1b 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl @@ -336,7 +336,13 @@ test_authenticator_import_users(PathPrefix) -> {ok, CSVData} = file:read_file(CSVFileName), {ok, 204, _} = multipart_formdata_request(ImportUri, [], [ {filename, "user-credentials.csv", CSVData} - ]). + ]), + + %% test application/json + {ok, 204, _} = request(post, ImportUri ++ "?type=hash", emqx_utils_json:decode(JSONData)), + {ok, JSONData1} = file:read_file(filename:join([Dir, <<"data/user-credentials-plain.json">>])), + {ok, 204, _} = request(post, ImportUri ++ "?type=plain", emqx_utils_json:decode(JSONData1)), + ok. %%------------------------------------------------------------------------------ %% Helpers diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl index 80e6789d9..a1cbf362a 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl @@ -216,7 +216,7 @@ t_import_users(_) -> ?assertMatch( {error, {unsupported_file_format, _}}, emqx_authn_mnesia:import_users( - {<<"/file/with/unknown.extension">>, <<>>}, + {hash, <<"/file/with/unknown.extension">>, <<>>}, State ) ), @@ -224,7 +224,7 @@ t_import_users(_) -> ?assertEqual( {error, unknown_file_format}, emqx_authn_mnesia:import_users( - {<<"/file/with/no/extension">>, <<>>}, + {hash, <<"/file/with/no/extension">>, <<>>}, State ) ), @@ -251,6 +251,142 @@ t_import_users(_) -> sample_filename_and_data(<<"user-credentials-malformed.csv">>), State ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, <<"empty_users.json">>, <<"[]">>}, + State + ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, <<"empty_users.csv">>, <<>>}, + State + ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, prepared_user_list, []}, + State + ) + ). + +t_import_users_plain(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain.json">>), + State + ) + ), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain.csv">>), + State + ) + ). + +t_import_users_prepared_list(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + Users1 = [ + #{<<"user_id">> => <<"u1">>, <<"password">> => <<"p1">>, <<"is_superuser">> => true}, + #{<<"user_id">> => <<"u2">>, <<"password">> => <<"p2">>, <<"is_superuser">> => true} + ], + Users2 = [ + #{ + <<"user_id">> => <<"u3">>, + <<"password_hash">> => + <<"c5e46903df45e5dc096dc74657610dbee8deaacae656df88a1788f1847390242">>, + <<"salt">> => <<"e378187547bf2d6f0545a3f441aa4d8a">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"u4">>, + <<"password_hash">> => + <<"f4d17f300b11e522fd33f497c11b126ef1ea5149c74d2220f9a16dc876d4567b">>, + <<"salt">> => <<"6d3f9bd5b54d94b98adbcfe10b6d181f">>, + <<"is_superuser">> => true + } + ], + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {plain, prepared_user_list, Users1}, + State + ) + ), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {hash, prepared_user_list, Users2}, + State + ) + ). + +t_import_users_duplicated_records(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => plain, salt_position => disable}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain-dup.json">>), + State + ) + ), + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain-dup.csv">>), + State + ) + ), + Users1 = [ + #{ + <<"user_id">> => <<"myuser5">>, + <<"password">> => <<"password5">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"myuser5">>, + <<"password">> => <<"password6">>, + <<"is_superuser">> => false + } + ], + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {plain, prepared_user_list, Users1}, + State + ) + ), + + %% assert: the last record overwrites the previous one + ?assertMatch( + [ + {user_info, {_, <<"myuser1">>}, <<"password2">>, _, false}, + {user_info, {_, <<"myuser3">>}, <<"password4">>, _, false}, + {user_info, {_, <<"myuser5">>}, <<"password6">>, _, false} + ], + ets:tab2list(emqx_authn_mnesia) ). %%------------------------------------------------------------------------------ @@ -262,9 +398,12 @@ sample_filename(Name) -> filename:join([Dir, <<"data">>, Name]). sample_filename_and_data(Name) -> + sample_filename_and_data(hash, Name). + +sample_filename_and_data(Type, Name) -> Filename = sample_filename(Name), {ok, Data} = file:read_file(Filename), - {Filename, Data}. + {Type, Filename, Data}. config() -> #{ diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl index 321b145ac..6089b70db 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl @@ -81,7 +81,7 @@ import_users(post, #{ [{FileName, FileData}] = maps:to_list(maps:without([type], File)), case emqx_authn_chains:import_users( - ChainName, AuthId, {FileName, FileData} + ChainName, AuthId, {hash, FileName, FileData} ) of ok -> {204}; @@ -105,7 +105,7 @@ import_listener_users(post, #{ [{FileName, FileData}] = maps:to_list(maps:without([type], File)), case emqx_authn_chains:import_users( - ChainName, AuthId, {FileName, FileData} + ChainName, AuthId, {hash, FileName, FileData} ) of ok -> {204}; diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index 686a0be71..6a56ef454 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -206,11 +206,10 @@ fields(node_invitation_succeed) -> [ {finished_at, ?HOCON( - emqx_utils_calendar:epoch_millisecond(), + binary(), #{ - desc => - <<"The time of the async invitation result is received, millisecond precision epoch">>, - example => <<"1705044829915">> + desc => <<"The time of the async invitation result is received">>, + example => <<"2024-01-30T15:24:39.355+08:00">> } )} ]; @@ -223,11 +222,10 @@ fields(node_invitation_in_progress) -> )}, {started_at, ?HOCON( - emqx_utils_calendar:epoch_millisecond(), + binary(), #{ - desc => - <<"The start timestamp of the invitation, millisecond precision epoch">>, - example => <<"1705044829915">> + desc => <<"The time of the async invitation is started">>, + example => <<"2024-01-30T15:24:39.355+08:00">> } )} ]. diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 4b1f80597..59241bd02 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -991,12 +991,13 @@ catch_all(DataFun) -> collect_stats_json_data(StatsData, StatsClData) -> StatsDatas = collect_json_data_(StatsData), CLData = hd(collect_json_data_(StatsClData)), - lists:map( + Res = lists:map( fun(NodeData) -> maps:merge(NodeData, CLData) end, StatsDatas - ). + ), + json_obj_or_array(Res). %% always return json array collect_cert_json_data(Data) -> @@ -1004,32 +1005,34 @@ collect_cert_json_data(Data) -> collect_vm_json_data(Data) -> DataListPerNode = collect_json_data_(Data), - case {?GET_PROM_DATA_MODE(), DataListPerNode} of - {?PROM_DATA_MODE__NODE, [NData | _]} -> - NData; - {_, _} -> + case ?GET_PROM_DATA_MODE() of + ?PROM_DATA_MODE__NODE -> + hd(DataListPerNode); + _ -> DataListPerNode end. collect_json_data(Data0) -> DataListPerNode = collect_json_data_(Data0), - case {?GET_PROM_DATA_MODE(), DataListPerNode} of - %% all nodes results unaggregated, should be a list - {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, _} -> - DataListPerNode; - %% only local node result [#{...}] - %% To guaranteed compatibility, return a json object, not array - {?PROM_DATA_MODE__NODE, [NData | _]} -> - NData; - %% All nodes results aggregated - %% return a json object, not array - {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, [NData | _]} -> - NData; - %% olp maybe not enabled, with empty list to empty object - {_, []} -> - #{} + json_obj_or_array(DataListPerNode). + +%% compatibility with previous api format in json mode +json_obj_or_array(DataL) -> + case ?GET_PROM_DATA_MODE() of + ?PROM_DATA_MODE__NODE -> + data_list_to_json_obj(DataL); + ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED -> + DataL; + ?PROM_DATA_MODE__ALL_NODES_AGGREGATED -> + data_list_to_json_obj(DataL) end. +data_list_to_json_obj([]) -> + %% olp maybe not enabled, with empty list to empty object + #{}; +data_list_to_json_obj(DataL) -> + hd(DataL). + collect_json_data_(Data) -> emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_prom_stats_metrics/3). diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index df28d893b..7480be0cc 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -16,6 +16,8 @@ -module(emqx_resource_metrics). +-include_lib("emqx/include/logger.hrl"). + -export([ events/0, install_telemetry_handler/1, @@ -118,6 +120,52 @@ handle_telemetry_event( _Metadata = #{resource_id := ID}, _HandlerConfig ) -> + try + handle_counter_telemetry_event(Event, ID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + try + handle_gauge_telemetry_event(Event, ID, WorkerID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. + +handle_counter_telemetry_event(Event, ID, Val) -> case Event of dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), @@ -154,13 +202,9 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); _ -> ok - end; -handle_telemetry_event( - [?TELEMETRY_PREFIX, Event], - _Measurements = #{gauge_set := Val}, - _Metadata = #{resource_id := ID, worker_id := WorkerID}, - _HandlerConfig -) -> + end. + +handle_gauge_telemetry_event(Event, ID, WorkerID, Val) -> case Event of inflight -> emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); @@ -168,9 +212,7 @@ handle_telemetry_event( emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); _ -> ok - end; -handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> - ok. + end. %% Gauges (value can go both up and down): %% -------------------------------------- diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2462123d6..d46fc6323 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -28,6 +28,7 @@ -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). +-define(TELEMETRY_PREFIX, emqx, resource). -import(emqx_common_test_helpers, [on_exit/1]). @@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> end ). +t_telemetry_handler_crash(_Config) -> + %% Check that a crash while handling a telemetry event, such as when a busy resource + %% is restarted and its metrics are not recreated while handling an increment, does + %% not lead to the handler being uninstalled. + ?check_trace( + begin + NonExistentId = <<"I-dont-exist">>, + WorkerId = 1, + HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]), + ?assertMatch([_ | _], HandlersBefore), + lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()), + emqx_common_test_helpers:with_mock( + emqx_metrics_worker, + set_gauge, + fun(_Name, _Id, _WorkerId, _Metric, _Val) -> + error(random_crash) + end, + fun() -> + lists:foreach( + fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns() + ) + end + ), + ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])), + ok + end, + [] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) -> }) end end. + +counter_metric_inc_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/1 + || {Fn, 1} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_inc", trailing) of + "_inc" -> true; + _ -> false + end + ]. + +gauge_metric_set_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/3 + || {Fn, 3} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_set", trailing) of + "_set" -> true; + _ -> false + end + ]. diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 1c5d8e55f..47540b0ec 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -81,6 +81,7 @@ fields("retainer") -> #{ required => false, desc => ?DESC(delivery_rate), + default => <<"1000/s">>, example => <<"1000/s">>, aliases => [deliver_rate] } diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 2818f6bfa..c76ba90c6 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -256,61 +256,67 @@ t_wildcard_subscription(_) -> ok = emqtt:disconnect(C1). t_message_expiry(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), + ConfMod = fun(Conf) -> + Conf#{<<"delivery_rate">> := <<"infinity">>} + end, + Case = fun() -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), - emqtt:publish( - C1, - <<"retained/0">>, - #{'Message-Expiry-Interval' => 0}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/1">>, - #{'Message-Expiry-Interval' => 2}, - <<"expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/2">>, - #{'Message-Expiry-Interval' => 5}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/3">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"$SYS/retained/4">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), + emqtt:publish( + C1, + <<"retained/0">>, + #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/1">>, + #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/2">>, + #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(5, length(receive_messages(5))), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(5, length(receive_messages(5))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), - timer:sleep(3000), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(4, length(receive_messages(5))), + timer:sleep(3000), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(4, length(receive_messages(5))), - emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), - ok = emqtt:disconnect(C1). + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). t_message_expiry_2(_) -> ConfMod = fun(Conf) -> @@ -410,6 +416,7 @@ t_flow_control(_) -> JsonCfg = make_limiter_json(<<"1/1s">>), emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, <<"flow_control">> => #{ <<"batch_read_number">> => 1, diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 21321400d..5fd3515ad 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -37,6 +37,11 @@ ets/1 ]). +%% Streams from .csv data +-export([ + csv/1 +]). + -export_type([stream/1]). %% @doc A stream is essentially a lazy list. @@ -45,6 +50,8 @@ -dialyzer(no_improper_lists). +-elvis([{elvis_style, nesting_level, disable}]). + %% %% @doc Make a stream that produces no values. @@ -157,3 +164,36 @@ ets(Cont, ContF) -> [] end end. + +%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once. +%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. +-spec csv(binary()) -> stream(map()). +csv(Bin) when is_binary(Bin) -> + Reader = fun _Iter(Headers, Lines) -> + case csv_read_line(Lines) of + {Fields, Rest} -> + case length(Fields) == length(Headers) of + true -> + User = maps:from_list(lists:zip(Headers, Fields)), + [User | fun() -> _Iter(Headers, Rest) end]; + false -> + error(bad_format) + end; + eof -> + [] + end + end, + HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]), + case csv_read_line(HeadersAndLines) of + {CSVHeaders, CSVLines} -> + fun() -> Reader(CSVHeaders, CSVLines) end; + eof -> + empty() + end. + +csv_read_line([Line | Lines]) -> + %% XXX: not support ' ' for the field value + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {Fields, Lines}; +csv_read_line([]) -> + eof. diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index ef8185a94..89ca92d20 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -82,3 +82,34 @@ mqueue_test() -> [1, 42, 2], emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) ). + +csv_test() -> + Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, + ?assertEqual( + [ + #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>}, + #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>} + ], + emqx_utils_stream:consume(emqx_utils_stream:csv(Data1)) + ), + + Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>, + ?assertEqual( + [ + #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>}, + #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>} + ], + emqx_utils_stream:consume(emqx_utils_stream:csv(Data2)) + ), + + ?assertEqual( + [], + emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>)) + ), + + BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>, + ?assertException( + error, + bad_format, + emqx_utils_stream:consume(emqx_utils_stream:csv(BadData)) + ). diff --git a/changes/ce/feat-12396.en.md b/changes/ce/feat-12396.en.md new file mode 100644 index 000000000..c1a35d3f8 --- /dev/null +++ b/changes/ce/feat-12396.en.md @@ -0,0 +1,4 @@ +Enhanced the `authentication/:id/import_users` interface for a more user-friendly user import feature: + +- Add new parameter `?type=plain` to support importing users with plaintext passwords in addition to the current solution which only supports password hash. +- Support `content-type: application/json` to accept HTTP Body in JSON format in extension to the current solution which only supports `multipart/form-data` for csv format. diff --git a/changes/ce/fix-12404.en.md b/changes/ce/fix-12404.en.md new file mode 100644 index 000000000..8706416a8 --- /dev/null +++ b/changes/ce/fix-12404.en.md @@ -0,0 +1 @@ +Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected.