From 2e37f7514a4f200cfa2e461305729ceb1595123a Mon Sep 17 00:00:00 2001 From: JimMoen Date: Fri, 26 Jan 2024 15:53:52 +0800 Subject: [PATCH 01/18] fix(prom_stats): compatibility with previous api format in json mode --- apps/emqx_prometheus/src/emqx_prometheus.erl | 45 +++++++++++--------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/apps/emqx_prometheus/src/emqx_prometheus.erl b/apps/emqx_prometheus/src/emqx_prometheus.erl index 4b1f80597..59241bd02 100644 --- a/apps/emqx_prometheus/src/emqx_prometheus.erl +++ b/apps/emqx_prometheus/src/emqx_prometheus.erl @@ -991,12 +991,13 @@ catch_all(DataFun) -> collect_stats_json_data(StatsData, StatsClData) -> StatsDatas = collect_json_data_(StatsData), CLData = hd(collect_json_data_(StatsClData)), - lists:map( + Res = lists:map( fun(NodeData) -> maps:merge(NodeData, CLData) end, StatsDatas - ). + ), + json_obj_or_array(Res). %% always return json array collect_cert_json_data(Data) -> @@ -1004,32 +1005,34 @@ collect_cert_json_data(Data) -> collect_vm_json_data(Data) -> DataListPerNode = collect_json_data_(Data), - case {?GET_PROM_DATA_MODE(), DataListPerNode} of - {?PROM_DATA_MODE__NODE, [NData | _]} -> - NData; - {_, _} -> + case ?GET_PROM_DATA_MODE() of + ?PROM_DATA_MODE__NODE -> + hd(DataListPerNode); + _ -> DataListPerNode end. collect_json_data(Data0) -> DataListPerNode = collect_json_data_(Data0), - case {?GET_PROM_DATA_MODE(), DataListPerNode} of - %% all nodes results unaggregated, should be a list - {?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED, _} -> - DataListPerNode; - %% only local node result [#{...}] - %% To guaranteed compatibility, return a json object, not array - {?PROM_DATA_MODE__NODE, [NData | _]} -> - NData; - %% All nodes results aggregated - %% return a json object, not array - {?PROM_DATA_MODE__ALL_NODES_AGGREGATED, [NData | _]} -> - NData; - %% olp maybe not enabled, with empty list to empty object - {_, []} -> - #{} + json_obj_or_array(DataListPerNode). + +%% compatibility with previous api format in json mode +json_obj_or_array(DataL) -> + case ?GET_PROM_DATA_MODE() of + ?PROM_DATA_MODE__NODE -> + data_list_to_json_obj(DataL); + ?PROM_DATA_MODE__ALL_NODES_UNAGGREGATED -> + DataL; + ?PROM_DATA_MODE__ALL_NODES_AGGREGATED -> + data_list_to_json_obj(DataL) end. +data_list_to_json_obj([]) -> + %% olp maybe not enabled, with empty list to empty object + #{}; +data_list_to_json_obj(DataL) -> + hd(DataL). + collect_json_data_(Data) -> emqx_prometheus_cluster:collect_json_data(Data, fun zip_json_prom_stats_metrics/3). From 6410f5a717d8d7acd2d6e3ec81b685c4aaeaea38 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 Jan 2024 10:21:27 -0300 Subject: [PATCH 02/18] fix(resource_metrics): avoid detaching handler on crashes Fixes https://emqx.atlassian.net/browse/EMQX-11821 --- .../src/emqx_resource_metrics.erl | 62 ++++++++++++++++--- .../test/emqx_resource_SUITE.erl | 53 ++++++++++++++++ changes/ce/fix-12404.en.md | 1 + 3 files changed, 106 insertions(+), 10 deletions(-) create mode 100644 changes/ce/fix-12404.en.md diff --git a/apps/emqx_resource/src/emqx_resource_metrics.erl b/apps/emqx_resource/src/emqx_resource_metrics.erl index df28d893b..7480be0cc 100644 --- a/apps/emqx_resource/src/emqx_resource_metrics.erl +++ b/apps/emqx_resource/src/emqx_resource_metrics.erl @@ -16,6 +16,8 @@ -module(emqx_resource_metrics). +-include_lib("emqx/include/logger.hrl"). + -export([ events/0, install_telemetry_handler/1, @@ -118,6 +120,52 @@ handle_telemetry_event( _Metadata = #{resource_id := ID}, _HandlerConfig ) -> + try + handle_counter_telemetry_event(Event, ID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event( + [?TELEMETRY_PREFIX, Event], + _Measurements = #{gauge_set := Val}, + _Metadata = #{resource_id := ID, worker_id := WorkerID}, + _HandlerConfig +) -> + try + handle_gauge_telemetry_event(Event, ID, WorkerID, Val) + catch + Kind:Reason:Stacktrace -> + %% We catch errors to avoid detaching the telemetry handler function. + %% When restarting a resource while it's under load, there might be transient + %% failures while the metrics are not yet created. + ?SLOG(warning, #{ + msg => "handle_resource_metrics_failed", + hint => "transient failures may occur when restarting a resource", + kind => Kind, + reason => Reason, + stacktrace => Stacktrace, + resource_id => ID, + event => Event + }), + ok + end; +handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> + ok. + +handle_counter_telemetry_event(Event, ID, Val) -> case Event of dropped_other -> emqx_metrics_worker:inc(?RES_METRICS, ID, 'dropped', Val), @@ -154,13 +202,9 @@ handle_telemetry_event( emqx_metrics_worker:inc(?RES_METRICS, ID, 'success', Val); _ -> ok - end; -handle_telemetry_event( - [?TELEMETRY_PREFIX, Event], - _Measurements = #{gauge_set := Val}, - _Metadata = #{resource_id := ID, worker_id := WorkerID}, - _HandlerConfig -) -> + end. + +handle_gauge_telemetry_event(Event, ID, WorkerID, Val) -> case Event of inflight -> emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'inflight', Val); @@ -168,9 +212,7 @@ handle_telemetry_event( emqx_metrics_worker:set_gauge(?RES_METRICS, ID, WorkerID, 'queuing', Val); _ -> ok - end; -handle_telemetry_event(_EventName, _Measurements, _Metadata, _HandlerConfig) -> - ok. + end. %% Gauges (value can go both up and down): %% -------------------------------------- diff --git a/apps/emqx_resource/test/emqx_resource_SUITE.erl b/apps/emqx_resource/test/emqx_resource_SUITE.erl index 2462123d6..d46fc6323 100644 --- a/apps/emqx_resource/test/emqx_resource_SUITE.erl +++ b/apps/emqx_resource/test/emqx_resource_SUITE.erl @@ -28,6 +28,7 @@ -define(DEFAULT_RESOURCE_GROUP, <<"default">>). -define(RESOURCE_ERROR(REASON), {error, {resource_error, #{reason := REASON}}}). -define(TRACE_OPTS, #{timetrap => 10000, timeout => 1000}). +-define(TELEMETRY_PREFIX, emqx, resource). -import(emqx_common_test_helpers, [on_exit/1]). @@ -3006,6 +3007,36 @@ do_t_resource_activate_alarm_once(ResourceConfig, SubscribeEvent) -> end ). +t_telemetry_handler_crash(_Config) -> + %% Check that a crash while handling a telemetry event, such as when a busy resource + %% is restarted and its metrics are not recreated while handling an increment, does + %% not lead to the handler being uninstalled. + ?check_trace( + begin + NonExistentId = <<"I-dont-exist">>, + WorkerId = 1, + HandlersBefore = telemetry:list_handlers([?TELEMETRY_PREFIX]), + ?assertMatch([_ | _], HandlersBefore), + lists:foreach(fun(Fn) -> Fn(NonExistentId) end, counter_metric_inc_fns()), + emqx_common_test_helpers:with_mock( + emqx_metrics_worker, + set_gauge, + fun(_Name, _Id, _WorkerId, _Metric, _Val) -> + error(random_crash) + end, + fun() -> + lists:foreach( + fun(Fn) -> Fn(NonExistentId, WorkerId, 1) end, gauge_metric_set_fns() + ) + end + ), + ?assertEqual(HandlersBefore, telemetry:list_handlers([?TELEMETRY_PREFIX])), + ok + end, + [] + ), + ok. + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -3235,3 +3266,25 @@ do_wait_until_all_marked_as_retriable(NumExpected, Seen) -> }) end end. + +counter_metric_inc_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/1 + || {Fn, 1} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_inc", trailing) of + "_inc" -> true; + _ -> false + end + ]. + +gauge_metric_set_fns() -> + Mod = emqx_resource_metrics, + [ + fun Mod:Fn/3 + || {Fn, 3} <- Mod:module_info(functions), + case string:find(atom_to_list(Fn), "_set", trailing) of + "_set" -> true; + _ -> false + end + ]. diff --git a/changes/ce/fix-12404.en.md b/changes/ce/fix-12404.en.md new file mode 100644 index 000000000..8706416a8 --- /dev/null +++ b/changes/ce/fix-12404.en.md @@ -0,0 +1 @@ +Fixed an issue where restarting a busy data integration could lead to data integration metrics to stop being collected. From e65cfb836ce5f97b31cd0ce1d027bafcfb11c4f9 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 25 Jan 2024 20:28:19 +0800 Subject: [PATCH 03/18] feat(import_users): support user's password in plain text --- .../emqx_authn/emqx_authn_user_import_api.erl | 129 ++++++-- .../src/emqx_auth_mnesia.app.src | 2 +- .../src/emqx_authn_mnesia.erl | 311 ++++++++++++------ 3 files changed, 315 insertions(+), 127 deletions(-) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl index bac313195..6986c52c2 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl @@ -58,8 +58,8 @@ schema("/authentication/:id/import_users") -> post => #{ tags => ?API_TAGS_GLOBAL, description => ?DESC(authentication_id_import_users_post), - parameters => [emqx_authn_api:param_auth_id()], - 'requestBody' => emqx_dashboard_swagger:file_schema(filename), + parameters => [emqx_authn_api:param_auth_id(), param_password_type()], + 'requestBody' => request_body_schema(), responses => #{ 204 => <<"Users imported">>, 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), @@ -74,8 +74,12 @@ schema("/listeners/:listener_id/authentication/:id/import_users") -> tags => ?API_TAGS_SINGLE, deprecated => true, description => ?DESC(listeners_listener_id_authentication_id_import_users_post), - parameters => [emqx_authn_api:param_listener_id(), emqx_authn_api:param_auth_id()], - 'requestBody' => emqx_dashboard_swagger:file_schema(filename), + parameters => [ + emqx_authn_api:param_listener_id(), + emqx_authn_api:param_auth_id(), + param_password_type() + ], + 'requestBody' => request_body_schema(), responses => #{ 204 => <<"Users imported">>, 400 => error_codes([?BAD_REQUEST], <<"Bad Request">>), @@ -84,37 +88,114 @@ schema("/listeners/:listener_id/authentication/:id/import_users") -> } }. +request_body_schema() -> + #{content := Content} = emqx_dashboard_swagger:file_schema(filename), + Content1 = + Content#{ + <<"application/json">> => #{ + schema => #{ + type => object, + example => [ + #{<<"user_id">> => <<"user1">>, <<"password">> => <<"password1">>}, + #{<<"user_id">> => <<"user2">>, <<"password">> => <<"password2">>} + ] + } + } + }, + #{ + content => Content1, + description => <<"Import body">> + }. + authenticator_import_users( post, - #{ + Req = #{ bindings := #{id := AuthenticatorID}, - body := #{<<"filename">> := #{type := _} = File} + headers := Headers, + body := Body } ) -> - [{FileName, FileData}] = maps:to_list(maps:without([type], File)), - case emqx_authn_chains:import_users(?GLOBAL, AuthenticatorID, {FileName, FileData}) of + PasswordType = password_type(Req), + Result = + case maps:get(<<"content-type">>, Headers, undefined) of + <<"application/json">> -> + emqx_authn_chains:import_users( + ?GLOBAL, AuthenticatorID, {PasswordType, prepared_user_list, Body} + ); + _ -> + case Body of + #{<<"filename">> := #{type := _} = File} -> + [{Name, Data}] = maps:to_list(maps:without([type], File)), + emqx_authn_chains:import_users( + ?GLOBAL, AuthenticatorID, {PasswordType, Name, Data} + ); + _ -> + {error, {missing_parameter, filename}} + end + end, + case Result of ok -> {204}; {error, Reason} -> emqx_authn_api:serialize_error(Reason) - end; -authenticator_import_users(post, #{bindings := #{id := _}, body := _}) -> - emqx_authn_api:serialize_error({missing_parameter, filename}). + end. listener_authenticator_import_users( post, - #{ + Req = #{ bindings := #{listener_id := ListenerID, id := AuthenticatorID}, - body := #{<<"filename">> := #{type := _} = File} + headers := Headers, + body := Body } ) -> - [{FileName, FileData}] = maps:to_list(maps:without([type], File)), - emqx_authn_api:with_chain( - ListenerID, - fun(ChainName) -> - case emqx_authn_chains:import_users(ChainName, AuthenticatorID, {FileName, FileData}) of - ok -> {204}; - {error, Reason} -> emqx_authn_api:serialize_error(Reason) + PasswordType = password_type(Req), + + DoImport = fun(FileName, FileData) -> + emqx_authn_api:with_chain( + ListenerID, + fun(ChainName) -> + case + emqx_authn_chains:import_users( + ChainName, AuthenticatorID, {PasswordType, FileName, FileData} + ) + of + ok -> {204}; + {error, Reason} -> emqx_authn_api:serialize_error(Reason) + end end - end - ); -listener_authenticator_import_users(post, #{bindings := #{listener_id := _, id := _}, body := _}) -> - emqx_authn_api:serialize_error({missing_parameter, filename}). + ) + end, + case maps:get(<<"content-type">>, Headers, undefined) of + <<"application/json">> -> + DoImport(prepared_user_list, Body); + _ -> + case Body of + #{<<"filename">> := #{type := _} = File} -> + [{Name, Data}] = maps:to_list(maps:without([type], File)), + DoImport(Name, Data); + _ -> + emqx_authn_api:serialize_error({missing_parameter, filename}) + end + end. + +%%-------------------------------------------------------------------- +%% helpers + +param_password_type() -> + {type, + hoconsc:mk( + binary(), + #{ + in => query, + enum => [<<"plain">>, <<"hash">>], + required => true, + desc => << + "The import file template type, enum with `plain`," + "`hash`" + >>, + example => <<"hash">> + } + )}. + +password_type(_Req = #{query_string := #{<<"type">> := Type}}) -> + binary_to_existing_atom(Type); +password_type(_) -> + hash. diff --git a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src index 3dbdf6625..28b8d7535 100644 --- a/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src +++ b/apps/emqx_auth_mnesia/src/emqx_auth_mnesia.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_auth_mnesia, [ {description, "EMQX Buitl-in Database Authentication and Authorization"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_auth_mnesia_app, []}}, {applications, [ diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index bbbaeddb1..51ed8fce8 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -52,9 +52,7 @@ do_destroy/1, do_add_user/1, do_delete_user/2, - do_update_user/3, - import/2, - import_csv/3 + do_update_user/3 ]). -export([mnesia/1, init_tables/0]). @@ -173,20 +171,76 @@ do_destroy(UserGroup) -> mnesia:select(?TAB, group_match_spec(UserGroup), write) ). -import_users({Filename0, FileData}, State) -> - Filename = to_binary(Filename0), - case filename:extension(Filename) of - <<".json">> -> - import_users_from_json(FileData, State); - <<".csv">> -> - CSV = csv_data(FileData), - import_users_from_csv(CSV, State); - <<>> -> - {error, unknown_file_format}; - Extension -> - {error, {unsupported_file_format, Extension}} +import_users({PasswordType, Filename, FileData}, State) -> + Convertor = convertor(PasswordType, State), + try + {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor), + try + case do_import_users(Users) of + ok -> ok; + {error, Reason} -> error(Reason) + end + catch + error:Reason1:Stk -> + ?SLOG( + warning, + #{ + msg => "import_users_failed", + type => PasswordType, + filename => Filename, + stacktrace => Stk + } + ), + _ = do_clean_imported_users(Users), + {error, Reason1} + end + catch + error:Reason2:Stk2 -> + ?SLOG( + warning, + #{ + msg => "import_users_failed", + type => PasswordType, + filename => Filename, + stacktrace => Stk2 + } + ), + {error, Reason2} end. +do_import_users(Users) -> + trans( + fun() -> + lists:foreach( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID, + <<"password_hash">> := PasswordHash, + <<"salt">> := Salt, + <<"is_superuser">> := IsSuperuser + } + ) -> + insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) + end, + Users + ) + end + ). + +do_clean_imported_users(Users) -> + lists:foreach( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID + } + ) -> + mria:dirty_delete(?TAB, {UserGroup, UserID}) + end, + Users + ). + add_user( UserInfo, State @@ -293,93 +347,6 @@ run_fuzzy_filter( %% Internal functions %%------------------------------------------------------------------------------ -%% Example: data/user-credentials.json -import_users_from_json(Bin, #{user_group := UserGroup}) -> - case emqx_utils_json:safe_decode(Bin, [return_maps]) of - {ok, List} -> - trans(fun ?MODULE:import/2, [UserGroup, List]); - {error, Reason} -> - {error, Reason} - end. - -%% Example: data/user-credentials.csv -import_users_from_csv(CSV, #{user_group := UserGroup}) -> - case get_csv_header(CSV) of - {ok, Seq, NewCSV} -> - trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]); - {error, Reason} -> - {error, Reason} - end. - -import(_UserGroup, []) -> - ok; -import(UserGroup, [ - #{ - <<"user_id">> := UserID, - <<"password_hash">> := PasswordHash - } = UserInfo - | More -]) when - is_binary(UserID) andalso is_binary(PasswordHash) --> - Salt = maps:get(<<"salt">>, UserInfo, <<>>), - IsSuperuser = maps:get(<<"is_superuser">>, UserInfo, false), - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), - import(UserGroup, More); -import(_UserGroup, [_ | _More]) -> - {error, bad_format}. - -%% Importing 5w users needs 1.7 seconds -import_csv(UserGroup, CSV, Seq) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - case get_user_info_by_seq(Fields, Seq) of - {ok, - #{ - user_id := UserID, - password_hash := PasswordHash - } = UserInfo} -> - Salt = maps:get(salt, UserInfo, <<>>), - IsSuperuser = maps:get(is_superuser, UserInfo, false), - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), - import_csv(UserGroup, NewCSV, Seq); - {error, Reason} -> - {error, Reason} - end; - eof -> - ok - end. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - {error, empty_file} - end. - -get_user_info_by_seq(Fields, Seq) -> - get_user_info_by_seq(Fields, Seq, #{}). - -get_user_info_by_seq([], [], #{user_id := _, password_hash := _} = Acc) -> - {ok, Acc}; -get_user_info_by_seq(_, [], _) -> - {error, bad_format}; -get_user_info_by_seq([UserID | More1], [<<"user_id">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{user_id => UserID}); -get_user_info_by_seq([PasswordHash | More1], [<<"password_hash">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{password_hash => PasswordHash}); -get_user_info_by_seq([Salt | More1], [<<"salt">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{salt => Salt}); -get_user_info_by_seq([<<"true">> | More1], [<<"is_superuser">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{is_superuser => true}); -get_user_info_by_seq([<<"false">> | More1], [<<"is_superuser">> | More2], Acc) -> - get_user_info_by_seq(More1, More2, Acc#{is_superuser => false}); -get_user_info_by_seq(_, _, _) -> - {error, bad_format}. - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser) -> UserInfoRecord = user_info_record(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), insert_user(UserInfoRecord). @@ -449,6 +416,12 @@ trans(Fun, Args) -> {aborted, Reason} -> {error, Reason} end. +trans(Fun) -> + case mria:transaction(?AUTHN_SHARD, Fun) of + {atomic, Res} -> Res; + {aborted, Reason} -> {error, Reason} + end. + to_binary(B) when is_binary(B) -> B; to_binary(L) when is_list(L) -> @@ -482,10 +455,144 @@ group_match_spec(UserGroup, QString) -> end) end. +%%-------------------------------------------------------------------- +%% parse import file/data + +parse_import_users(Filename, FileData, Convertor) -> + Eval = fun _Eval(F) -> + case F() of + eof -> []; + {User, F1} -> [User | _Eval(F1)] + end + end, + ReaderFn = reader_fn(Filename, FileData, Convertor), + Users = lists:reverse(Eval(ReaderFn)), + NewUsersCount = + lists:foldl( + fun( + #{ + <<"user_group">> := UserGroup, + <<"user_id">> := UserID + }, + Acc + ) -> + case ets:member(?TAB, {UserGroup, UserID}) of + true -> + Acc; + false -> + Acc + 1 + end + end, + 0, + Users + ), + {NewUsersCount, Users}. + +reader_fn(prepared_user_list, Data, Convertor) when is_list(Data) -> + reader(prepared_user_list, Data, Convertor); +reader_fn(Filename0, Data, Convertor) -> + Filename = to_binary(Filename0), + case filename:extension(Filename) of + <<".json">> -> + reader(json, Data, Convertor); + <<".csv">> -> + reader(csv, Data, Convertor); + <<>> -> + error(unknown_file_format); + Extension -> + error({unsupported_file_format, Extension}) + end. + +%% Example: data/user-credentials.json +reader(json, Data, Convertor) when is_binary(Data) -> + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, List} -> + reader(prepared_user_list, List, Convertor); + {error, Reason} -> + error(Reason) + end; +%% Example: data/user-credentials.csv +reader(csv, Data, Convertor) when is_binary(Data) -> + CSVData = csv_data(Data), + case get_csv_header(CSVData) of + {ok, Headers, CSVLines} -> + Reader = + fun _Iter(Lines) -> + case csv_read_line(Lines) of + {ok, Line, Rest} -> + %% XXX: not support ' ' for a field? + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ + global, trim_all + ]), + case length(Fields) == length(Headers) of + true -> + User = maps:from_list(lists:zip(Headers, Fields)), + {Convertor(User), fun() -> _Iter(Rest) end}; + false -> + error(bad_format) + end; + eof -> + eof + end + end, + fun() -> Reader(CSVLines) end; + {error, Reason} -> + error(Reason) + end; +%% Example: [#{<<"user_id">> => <<>>, ...}] +reader(prepared_user_list, Data, Convertor) when is_list(Data) -> + Reader = + fun + _Iter([]) -> eof; + _Iter([User | Rest]) -> {Convertor(User), fun() -> _Iter(Rest) end} + end, + fun() -> Reader(Data) end. + +convertor(PasswordType, State) -> + fun(User) -> + convert_user(User, PasswordType, State) + end. + +convert_user( + User = #{<<"user_id">> := UserId}, + PasswordType, + #{user_group := UserGroup, password_hash_algorithm := Algorithm} +) -> + {PasswordHash, Salt} = find_password_hash(PasswordType, User, Algorithm), + #{ + <<"user_id">> => UserId, + <<"password_hash">> => PasswordHash, + <<"salt">> => Salt, + <<"is_superuser">> => is_superuser(User), + <<"user_group">> => UserGroup + }; +convert_user(_, _, _) -> + error(bad_format). + +find_password_hash(hash, User = #{<<"password_hash">> := PasswordHash}, _) -> + {PasswordHash, maps:get(<<"salt">>, User, <<>>)}; +find_password_hash(plain, #{<<"password">> := Password}, Algorithm) -> + emqx_authn_password_hashing:hash(Algorithm, Password); +find_password_hash(_, _, _) -> + error(bad_format). + +is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true; +is_superuser(#{<<"is_superuser">> := true}) -> true; +is_superuser(_) -> false. + csv_data(Data) -> Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), {csv_data, Lines}. +get_csv_header(CSV) -> + case csv_read_line(CSV) of + {ok, Line, NewCSV} -> + Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {ok, Seq, NewCSV}; + eof -> + {error, empty_file} + end. + csv_read_line({csv_data, [Line | Lines]}) -> {ok, Line, {csv_data, Lines}}; csv_read_line({csv_data, []}) -> From fed512689a3c411215ea1c26322c0ed0652bdffc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Thu, 25 Jan 2024 20:53:10 +0800 Subject: [PATCH 04/18] chore: make elvis checking happy --- .../src/emqx_authn_mnesia.erl | 71 ++++++++----------- 1 file changed, 30 insertions(+), 41 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index 51ed8fce8..b6a3588bf 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -78,6 +78,8 @@ {<<"is_superuser">>, atom} ]). +-elvis([{elvis_style, nesting_level, disable}]). + %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -175,37 +177,25 @@ import_users({PasswordType, Filename, FileData}, State) -> Convertor = convertor(PasswordType, State), try {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor), - try - case do_import_users(Users) of - ok -> ok; - {error, Reason} -> error(Reason) - end - catch - error:Reason1:Stk -> - ?SLOG( - warning, - #{ - msg => "import_users_failed", - type => PasswordType, - filename => Filename, - stacktrace => Stk - } - ), + case do_import_users(Users) of + ok -> + ok; + {error, Reason} -> _ = do_clean_imported_users(Users), - {error, Reason1} + error(Reason) end catch - error:Reason2:Stk2 -> + error:Reason1:Stk -> ?SLOG( warning, #{ msg => "import_users_failed", type => PasswordType, filename => Filename, - stacktrace => Stk2 + stacktrace => Stk } ), - {error, Reason2} + {error, Reason1} end. do_import_users(Users) -> @@ -514,28 +504,27 @@ reader(json, Data, Convertor) when is_binary(Data) -> %% Example: data/user-credentials.csv reader(csv, Data, Convertor) when is_binary(Data) -> CSVData = csv_data(Data), + Reader = fun _Iter(Headers, Lines) -> + case csv_read_line(Lines) of + {ok, Line, Rest} -> + %% XXX: not support ' ' for a field? + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ + global, trim_all + ]), + case length(Fields) == length(Headers) of + true -> + User = maps:from_list(lists:zip(Headers, Fields)), + {Convertor(User), fun() -> _Iter(Headers, Rest) end}; + false -> + error(bad_format) + end; + eof -> + eof + end + end, case get_csv_header(CSVData) of - {ok, Headers, CSVLines} -> - Reader = - fun _Iter(Lines) -> - case csv_read_line(Lines) of - {ok, Line, Rest} -> - %% XXX: not support ' ' for a field? - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ - global, trim_all - ]), - case length(Fields) == length(Headers) of - true -> - User = maps:from_list(lists:zip(Headers, Fields)), - {Convertor(User), fun() -> _Iter(Rest) end}; - false -> - error(bad_format) - end; - eof -> - eof - end - end, - fun() -> Reader(CSVLines) end; + {ok, CSVHeaders, CSVLines} -> + fun() -> Reader(CSVHeaders, CSVLines) end; {error, Reason} -> error(Reason) end; From 829887630dec2be042e89884a6407491a08ee436 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 26 Jan 2024 10:15:40 +0800 Subject: [PATCH 05/18] test: refine existed test cases --- apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl | 6 +++++- apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl | 7 +++++-- .../src/emqx_authn/emqx_authn_user_import_api.erl | 6 ++++-- apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl | 6 +++--- .../src/emqx_gateway_api_authn_user_import.erl | 4 ++-- 5 files changed, 19 insertions(+), 10 deletions(-) diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl index 0d668bc20..f79afab29 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_chains.erl @@ -310,7 +310,11 @@ reorder_authenticator(_ChainName, []) -> reorder_authenticator(ChainName, AuthenticatorIDs) -> call({reorder_authenticator, ChainName, AuthenticatorIDs}). --spec import_users(chain_name(), authenticator_id(), {binary(), binary()}) -> +-spec import_users( + chain_name(), + authenticator_id(), + {plain | hash, prepared_user_list | binary(), binary()} +) -> ok | {error, term()}. import_users(ChainName, AuthenticatorID, Filename) -> call({import_users, ChainName, AuthenticatorID, Filename}). diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl index 3898d0bc4..897f6a288 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_provider.erl @@ -53,11 +53,14 @@ when when State :: state(). --callback import_users({Filename, FileData}, State) -> +-callback import_users({PasswordType, Filename, FileData}, State) -> ok | {error, term()} when - Filename :: binary(), FileData :: binary(), State :: state(). + PasswordType :: plain | hash, + Filename :: prepared_user_list | binary(), + FileData :: binary(), + State :: state(). -callback add_user(UserInfo, State) -> {ok, User} diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl index 6986c52c2..f45923756 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl @@ -195,7 +195,9 @@ param_password_type() -> } )}. -password_type(_Req = #{query_string := #{<<"type">> := Type}}) -> - binary_to_existing_atom(Type); +password_type(_Req = #{query_string := #{<<"type">> := <<"plain">>}}) -> + plain; +password_type(_Req = #{query_string := #{<<"type">> := <<"hash">>}}) -> + hash; password_type(_) -> hash. diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl index 80e6789d9..43b20c4fa 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl @@ -216,7 +216,7 @@ t_import_users(_) -> ?assertMatch( {error, {unsupported_file_format, _}}, emqx_authn_mnesia:import_users( - {<<"/file/with/unknown.extension">>, <<>>}, + {hash, <<"/file/with/unknown.extension">>, <<>>}, State ) ), @@ -224,7 +224,7 @@ t_import_users(_) -> ?assertEqual( {error, unknown_file_format}, emqx_authn_mnesia:import_users( - {<<"/file/with/no/extension">>, <<>>}, + {hash, <<"/file/with/no/extension">>, <<>>}, State ) ), @@ -264,7 +264,7 @@ sample_filename(Name) -> sample_filename_and_data(Name) -> Filename = sample_filename(Name), {ok, Data} = file:read_file(Filename), - {Filename, Data}. + {hash, Filename, Data}. config() -> #{ diff --git a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl index 321b145ac..6089b70db 100644 --- a/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl +++ b/apps/emqx_gateway/src/emqx_gateway_api_authn_user_import.erl @@ -81,7 +81,7 @@ import_users(post, #{ [{FileName, FileData}] = maps:to_list(maps:without([type], File)), case emqx_authn_chains:import_users( - ChainName, AuthId, {FileName, FileData} + ChainName, AuthId, {hash, FileName, FileData} ) of ok -> {204}; @@ -105,7 +105,7 @@ import_listener_users(post, #{ [{FileName, FileData}] = maps:to_list(maps:without([type], File)), case emqx_authn_chains:import_users( - ChainName, AuthId, {FileName, FileData} + ChainName, AuthId, {hash, FileName, FileData} ) of ok -> {204}; From 8fc8106819e8211f86e67f89427a76379d5fd5b1 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 26 Jan 2024 11:35:27 +0800 Subject: [PATCH 06/18] test: cover password_type and new data format --- .../test/data/user-credentials-plain.csv | 3 + .../test/data/user-credentials-plain.json | 12 ++++ .../test/emqx_authn_api_mnesia_SUITE.erl | 8 ++- .../test/emqx_authn_mnesia_SUITE.erl | 68 ++++++++++++++++++- 4 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_auth/test/data/user-credentials-plain.csv create mode 100644 apps/emqx_auth/test/data/user-credentials-plain.json diff --git a/apps/emqx_auth/test/data/user-credentials-plain.csv b/apps/emqx_auth/test/data/user-credentials-plain.csv new file mode 100644 index 000000000..223be0bf9 --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain.csv @@ -0,0 +1,3 @@ +user_id,password,is_superuser +myuser3,password3,true +myuser4,password4,false diff --git a/apps/emqx_auth/test/data/user-credentials-plain.json b/apps/emqx_auth/test/data/user-credentials-plain.json new file mode 100644 index 000000000..ec5e1a82f --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain.json @@ -0,0 +1,12 @@ +[ + { + "user_id":"myuser1", + "password":"password1", + "is_superuser": true + }, + { + "user_id":"myuser2", + "password":"password2", + "is_superuser": false + } +] diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl index 2035cf2fa..71da75c1b 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_api_mnesia_SUITE.erl @@ -336,7 +336,13 @@ test_authenticator_import_users(PathPrefix) -> {ok, CSVData} = file:read_file(CSVFileName), {ok, 204, _} = multipart_formdata_request(ImportUri, [], [ {filename, "user-credentials.csv", CSVData} - ]). + ]), + + %% test application/json + {ok, 204, _} = request(post, ImportUri ++ "?type=hash", emqx_utils_json:decode(JSONData)), + {ok, JSONData1} = file:read_file(filename:join([Dir, <<"data/user-credentials-plain.json">>])), + {ok, 204, _} = request(post, ImportUri ++ "?type=plain", emqx_utils_json:decode(JSONData1)), + ok. %%------------------------------------------------------------------------------ %% Helpers diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl index 43b20c4fa..26ee9c406 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl @@ -253,6 +253,69 @@ t_import_users(_) -> ) ). +t_import_users_plain(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain.json">>), + State + ) + ), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain.csv">>), + State + ) + ). + +t_import_users_prepared_list(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => sha256, salt_position => suffix}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + Users1 = [ + #{<<"user_id">> => <<"u1">>, <<"password">> => <<"p1">>, <<"is_superuser">> => true}, + #{<<"user_id">> => <<"u2">>, <<"password">> => <<"p2">>, <<"is_superuser">> => true} + ], + Users2 = [ + #{ + <<"user_id">> => <<"u3">>, + <<"password_hash">> => + <<"c5e46903df45e5dc096dc74657610dbee8deaacae656df88a1788f1847390242">>, + <<"salt">> => <<"e378187547bf2d6f0545a3f441aa4d8a">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"u4">>, + <<"password_hash">> => + <<"f4d17f300b11e522fd33f497c11b126ef1ea5149c74d2220f9a16dc876d4567b">>, + <<"salt">> => <<"6d3f9bd5b54d94b98adbcfe10b6d181f">>, + <<"is_superuser">> => true + } + ], + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {plain, prepared_user_list, Users1}, + State + ) + ), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {hash, prepared_user_list, Users2}, + State + ) + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ @@ -262,9 +325,12 @@ sample_filename(Name) -> filename:join([Dir, <<"data">>, Name]). sample_filename_and_data(Name) -> + sample_filename_and_data(hash, Name). + +sample_filename_and_data(Type, Name) -> Filename = sample_filename(Name), {ok, Data} = file:read_file(Filename), - {hash, Filename, Data}. + {Type, Filename, Data}. config() -> #{ From 4c1f1d97cd435d81b4af8be7e56cb35807b79c48 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Fri, 26 Jan 2024 13:46:01 +0800 Subject: [PATCH 07/18] chore: update changes --- changes/ce/feat-12396.en.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changes/ce/feat-12396.en.md diff --git a/changes/ce/feat-12396.en.md b/changes/ce/feat-12396.en.md new file mode 100644 index 000000000..d835ec7b4 --- /dev/null +++ b/changes/ce/feat-12396.en.md @@ -0,0 +1,5 @@ +Enhanced the `authentication/:id/import_users` interface for a more user-friendly user import feature: + +- Add new query parameter `type=plain` to support importing users with plaintext passwords, + not just hashed ciphertext passwords. +- Support `content-type: application/json` to accept HTTP Body in JSON format, not just file. From bcbd48ae58eb20af41fa302c22d4705094ca24ac Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 09:21:52 +0800 Subject: [PATCH 08/18] chore: apply suggestions from code review Co-authored-by: Zaiming (Stone) Shi --- changes/ce/feat-12396.en.md | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/changes/ce/feat-12396.en.md b/changes/ce/feat-12396.en.md index d835ec7b4..f54f1dc30 100644 --- a/changes/ce/feat-12396.en.md +++ b/changes/ce/feat-12396.en.md @@ -1,5 +1,4 @@ Enhanced the `authentication/:id/import_users` interface for a more user-friendly user import feature: -- Add new query parameter `type=plain` to support importing users with plaintext passwords, - not just hashed ciphertext passwords. -- Support `content-type: application/json` to accept HTTP Body in JSON format, not just file. +- Add new parameter `?type=plain` to support importing users with plaintext passwords in extension to the current solution which only supports password hash. +- Support `content-type: application/json` to accept HTTP Body in JSON format in extension to the current solution which only supports `multipart/form-data` for csv format. From 9915c85b0be14832382b5a32aaa5f5f32ff61696 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 10:20:35 +0800 Subject: [PATCH 09/18] chore(authn_mnesia): use emqx_utils_stream module to imporve reusability --- .../src/emqx_authn_mnesia.erl | 92 ++++--------------- apps/emqx_utils/src/emqx_utils_stream.erl | 53 +++++++++++ 2 files changed, 73 insertions(+), 72 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index b6a3588bf..282fda194 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -190,6 +190,7 @@ import_users({PasswordType, Filename, FileData}, State) -> warning, #{ msg => "import_users_failed", + reason => Reason1, type => PasswordType, filename => Filename, stacktrace => Stk @@ -451,11 +452,11 @@ group_match_spec(UserGroup, QString) -> parse_import_users(Filename, FileData, Convertor) -> Eval = fun _Eval(F) -> case F() of - eof -> []; - {User, F1} -> [User | _Eval(F1)] + [] -> []; + [User | F1] -> [Convertor(User) | _Eval(F1)] end end, - ReaderFn = reader_fn(Filename, FileData, Convertor), + ReaderFn = reader_fn(Filename, FileData), Users = lists:reverse(Eval(ReaderFn)), NewUsersCount = lists:foldl( @@ -478,65 +479,30 @@ parse_import_users(Filename, FileData, Convertor) -> ), {NewUsersCount, Users}. -reader_fn(prepared_user_list, Data, Convertor) when is_list(Data) -> - reader(prepared_user_list, Data, Convertor); -reader_fn(Filename0, Data, Convertor) -> - Filename = to_binary(Filename0), - case filename:extension(Filename) of +reader_fn(prepared_user_list, List) when is_list(List) -> + %% Example: [#{<<"user_id">> => <<>>, ...}] + emqx_utils_stream:list(List); +reader_fn(Filename0, Data) -> + case filename:extension(to_binary(Filename0)) of <<".json">> -> - reader(json, Data, Convertor); + %% Example: data/user-credentials.json + case emqx_utils_json:safe_decode(Data, [return_maps]) of + {ok, List} when is_list(List) -> + emqx_utils_stream:list(List); + {ok, _} -> + error(unknown_file_format); + {error, Reason} -> + error(Reason) + end; <<".csv">> -> - reader(csv, Data, Convertor); + %% Example: data/user-credentials.csv + emqx_utils_stream:csv(Data); <<>> -> error(unknown_file_format); Extension -> error({unsupported_file_format, Extension}) end. -%% Example: data/user-credentials.json -reader(json, Data, Convertor) when is_binary(Data) -> - case emqx_utils_json:safe_decode(Data, [return_maps]) of - {ok, List} -> - reader(prepared_user_list, List, Convertor); - {error, Reason} -> - error(Reason) - end; -%% Example: data/user-credentials.csv -reader(csv, Data, Convertor) when is_binary(Data) -> - CSVData = csv_data(Data), - Reader = fun _Iter(Headers, Lines) -> - case csv_read_line(Lines) of - {ok, Line, Rest} -> - %% XXX: not support ' ' for a field? - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ - global, trim_all - ]), - case length(Fields) == length(Headers) of - true -> - User = maps:from_list(lists:zip(Headers, Fields)), - {Convertor(User), fun() -> _Iter(Headers, Rest) end}; - false -> - error(bad_format) - end; - eof -> - eof - end - end, - case get_csv_header(CSVData) of - {ok, CSVHeaders, CSVLines} -> - fun() -> Reader(CSVHeaders, CSVLines) end; - {error, Reason} -> - error(Reason) - end; -%% Example: [#{<<"user_id">> => <<>>, ...}] -reader(prepared_user_list, Data, Convertor) when is_list(Data) -> - Reader = - fun - _Iter([]) -> eof; - _Iter([User | Rest]) -> {Convertor(User), fun() -> _Iter(Rest) end} - end, - fun() -> Reader(Data) end. - convertor(PasswordType, State) -> fun(User) -> convert_user(User, PasswordType, State) @@ -568,21 +534,3 @@ find_password_hash(_, _, _) -> is_superuser(#{<<"is_superuser">> := <<"true">>}) -> true; is_superuser(#{<<"is_superuser">> := true}) -> true; is_superuser(_) -> false. - -csv_data(Data) -> - Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), - {csv_data, Lines}. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - {error, empty_file} - end. - -csv_read_line({csv_data, [Line | Lines]}) -> - {ok, Line, {csv_data, Lines}}; -csv_read_line({csv_data, []}) -> - eof. diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 21321400d..dbaf542bf 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -37,6 +37,11 @@ ets/1 ]). +%% Streams from .csv data +-export([ + csv/1 +]). + -export_type([stream/1]). %% @doc A stream is essentially a lazy list. @@ -157,3 +162,51 @@ ets(Cont, ContF) -> [] end end. + +%% @doc Make a stream out of a .csv binary, where the .csv binary is loaded in all at once. +%% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. +-spec csv(binary()) -> stream(map()). +csv(Bin) when is_binary(Bin) -> + CSVData = csv_data(Bin), + Reader = fun _Iter(Headers, Lines) -> + case csv_read_line(Lines) of + {ok, Line, Rest} -> + %% XXX: not support ' ' for a field? + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ + global, trim_all + ]), + case length(Fields) == length(Headers) of + true -> + User = maps:from_list(lists:zip(Headers, Fields)), + [User | fun() -> _Iter(Headers, Rest) end]; + false -> + error(bad_format) + end; + eof -> + [] + end + end, + case get_csv_header(CSVData) of + {ok, CSVHeaders, CSVLines} -> + fun() -> Reader(CSVHeaders, CSVLines) end; + {error, Reason} -> + error(Reason) + end. + +csv_data(Data) -> + Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), + {csv_data, Lines}. + +get_csv_header(CSV) -> + case csv_read_line(CSV) of + {ok, Line, NewCSV} -> + Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {ok, Seq, NewCSV}; + eof -> + {error, empty_file} + end. + +csv_read_line({csv_data, [Line | Lines]}) -> + {ok, Line, {csv_data, Lines}}; +csv_read_line({csv_data, []}) -> + eof. From 2e35024df16a8228a37815fee2d9709657a8f6dc Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 10:48:22 +0800 Subject: [PATCH 10/18] test: update eunit tests --- .../src/emqx_authn_mnesia.erl | 6 ++--- .../test/emqx_authn_mnesia_SUITE.erl | 24 +++++++++++++++++++ apps/emqx_utils/src/emqx_utils_stream.erl | 8 ++++--- .../test/emqx_utils_stream_tests.erl | 22 +++++++++++++++++ 4 files changed, 54 insertions(+), 6 deletions(-) diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index 282fda194..bae7dc96b 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -78,8 +78,6 @@ {<<"is_superuser">>, atom} ]). --elvis([{elvis_style, nesting_level, disable}]). - %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -177,7 +175,9 @@ import_users({PasswordType, Filename, FileData}, State) -> Convertor = convertor(PasswordType, State), try {_NewUsersCnt, Users} = parse_import_users(Filename, FileData, Convertor), - case do_import_users(Users) of + case length(Users) > 0 andalso do_import_users(Users) of + false -> + error(empty_users); ok -> ok; {error, Reason} -> diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl index 26ee9c406..479b5cdde 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl @@ -251,6 +251,30 @@ t_import_users(_) -> sample_filename_and_data(<<"user-credentials-malformed.csv">>), State ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, <<"empty_users.json">>, <<"[]">>}, + State + ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, <<"empty_users.csv">>, <<>>}, + State + ) + ), + + ?assertEqual( + {error, empty_users}, + emqx_authn_mnesia:import_users( + {hash, prepared_user_list, []}, + State + ) ). t_import_users_plain(_) -> diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index dbaf542bf..70f4ea6f8 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -50,6 +50,8 @@ -dialyzer(no_improper_lists). +-elvis([{elvis_style, nesting_level, disable}]). + %% %% @doc Make a stream that produces no values. @@ -189,8 +191,8 @@ csv(Bin) when is_binary(Bin) -> case get_csv_header(CSVData) of {ok, CSVHeaders, CSVLines} -> fun() -> Reader(CSVHeaders, CSVLines) end; - {error, Reason} -> - error(Reason) + error -> + empty() end. csv_data(Data) -> @@ -203,7 +205,7 @@ get_csv_header(CSV) -> Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), {ok, Seq, NewCSV}; eof -> - {error, empty_file} + error end. csv_read_line({csv_data, [Line | Lines]}) -> diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index ef8185a94..904572375 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -82,3 +82,25 @@ mqueue_test() -> [1, 42, 2], emqx_utils_stream:consume(emqx_utils_stream:mqueue(400)) ). + +csv_test() -> + Data = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5,v6">>, + ?assertEqual( + [ + #{<<"h1">> => <<"v1">>, <<"h2">> => <<"v2">>, <<"h3">> => <<"v3">>}, + #{<<"h1">> => <<"v4">>, <<"h2">> => <<"v5">>, <<"h3">> => <<"v6">>} + ], + emqx_utils_stream:consume(emqx_utils_stream:csv(Data)) + ), + + ?assertEqual( + [], + emqx_utils_stream:consume(emqx_utils_stream:csv(<<"">>)) + ), + + BadData = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5">>, + ?assertException( + error, + bad_format, + emqx_utils_stream:consume(emqx_utils_stream:csv(BadData)) + ). From ab99a17c99d87bfcbcc0258430e450108c086b88 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 11:10:50 +0800 Subject: [PATCH 11/18] chore(utils_stream): simplfy the csv reader implementation --- apps/emqx_utils/src/emqx_utils_stream.erl | 35 +++++++---------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/apps/emqx_utils/src/emqx_utils_stream.erl b/apps/emqx_utils/src/emqx_utils_stream.erl index 70f4ea6f8..5fd3515ad 100644 --- a/apps/emqx_utils/src/emqx_utils_stream.erl +++ b/apps/emqx_utils/src/emqx_utils_stream.erl @@ -169,14 +169,9 @@ ets(Cont, ContF) -> %% The .csv binary is assumed to be in UTF-8 encoding and to have a header row. -spec csv(binary()) -> stream(map()). csv(Bin) when is_binary(Bin) -> - CSVData = csv_data(Bin), Reader = fun _Iter(Headers, Lines) -> case csv_read_line(Lines) of - {ok, Line, Rest} -> - %% XXX: not support ' ' for a field? - Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [ - global, trim_all - ]), + {Fields, Rest} -> case length(Fields) == length(Headers) of true -> User = maps:from_list(lists:zip(Headers, Fields)), @@ -188,27 +183,17 @@ csv(Bin) when is_binary(Bin) -> [] end end, - case get_csv_header(CSVData) of - {ok, CSVHeaders, CSVLines} -> + HeadersAndLines = binary:split(Bin, [<<"\r">>, <<"\n">>], [global, trim_all]), + case csv_read_line(HeadersAndLines) of + {CSVHeaders, CSVLines} -> fun() -> Reader(CSVHeaders, CSVLines) end; - error -> + eof -> empty() end. -csv_data(Data) -> - Lines = binary:split(Data, [<<"\r">>, <<"\n">>], [global, trim_all]), - {csv_data, Lines}. - -get_csv_header(CSV) -> - case csv_read_line(CSV) of - {ok, Line, NewCSV} -> - Seq = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), - {ok, Seq, NewCSV}; - eof -> - error - end. - -csv_read_line({csv_data, [Line | Lines]}) -> - {ok, Line, {csv_data, Lines}}; -csv_read_line({csv_data, []}) -> +csv_read_line([Line | Lines]) -> + %% XXX: not support ' ' for the field value + Fields = binary:split(Line, [<<",">>, <<" ">>, <<"\n">>], [global, trim_all]), + {Fields, Lines}; +csv_read_line([]) -> eof. From e4b8d794442ff30e71c080ab2fe3be135bfd11ca Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Tue, 23 Jan 2024 10:19:55 +0100 Subject: [PATCH 12/18] fix(retainer): add default delivery rate in the schema --- apps/emqx_retainer/src/emqx_retainer_schema.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/emqx_retainer/src/emqx_retainer_schema.erl b/apps/emqx_retainer/src/emqx_retainer_schema.erl index 1c5d8e55f..47540b0ec 100644 --- a/apps/emqx_retainer/src/emqx_retainer_schema.erl +++ b/apps/emqx_retainer/src/emqx_retainer_schema.erl @@ -81,6 +81,7 @@ fields("retainer") -> #{ required => false, desc => ?DESC(delivery_rate), + default => <<"1000/s">>, example => <<"1000/s">>, aliases => [deliver_rate] } From fa6d65887d6a64c7b69ed8583f15826865366d59 Mon Sep 17 00:00:00 2001 From: Ivan Dyachkov Date: Mon, 29 Jan 2024 08:42:36 +0100 Subject: [PATCH 13/18] test(retainer): fix test cases --- .../test/emqx_retainer_SUITE.erl | 107 ++++++++++-------- 1 file changed, 57 insertions(+), 50 deletions(-) diff --git a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl index 2818f6bfa..c76ba90c6 100644 --- a/apps/emqx_retainer/test/emqx_retainer_SUITE.erl +++ b/apps/emqx_retainer/test/emqx_retainer_SUITE.erl @@ -256,61 +256,67 @@ t_wildcard_subscription(_) -> ok = emqtt:disconnect(C1). t_message_expiry(_) -> - {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), - {ok, _} = emqtt:connect(C1), + ConfMod = fun(Conf) -> + Conf#{<<"delivery_rate">> := <<"infinity">>} + end, + Case = fun() -> + {ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C1), - emqtt:publish( - C1, - <<"retained/0">>, - #{'Message-Expiry-Interval' => 0}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/1">>, - #{'Message-Expiry-Interval' => 2}, - <<"expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/2">>, - #{'Message-Expiry-Interval' => 5}, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"retained/3">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), - emqtt:publish( - C1, - <<"$SYS/retained/4">>, - <<"don't expire">>, - [{qos, 0}, {retain, true}] - ), + emqtt:publish( + C1, + <<"retained/0">>, + #{'Message-Expiry-Interval' => 0}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/1">>, + #{'Message-Expiry-Interval' => 2}, + <<"expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/2">>, + #{'Message-Expiry-Interval' => 5}, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"retained/3">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), + emqtt:publish( + C1, + <<"$SYS/retained/4">>, + <<"don't expire">>, + [{qos, 0}, {retain, true}] + ), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(5, length(receive_messages(5))), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), - {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(5, length(receive_messages(5))), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"retained/+">>), + {ok, #{}, [0]} = emqtt:unsubscribe(C1, <<"$SYS/retained/+">>), - timer:sleep(3000), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), - {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), - ?assertEqual(4, length(receive_messages(5))), + timer:sleep(3000), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"retained/+">>, 0), + {ok, #{}, [0]} = emqtt:subscribe(C1, <<"$SYS/retained/+">>, 0), + ?assertEqual(4, length(receive_messages(5))), - emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), - emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/0">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/1">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/2">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"retained/3">>, <<"">>, [{qos, 0}, {retain, true}]), + emqtt:publish(C1, <<"$SYS/retained/4">>, <<"">>, [{qos, 0}, {retain, true}]), - ok = emqtt:disconnect(C1). + ok = emqtt:disconnect(C1) + end, + with_conf(ConfMod, Case). t_message_expiry_2(_) -> ConfMod = fun(Conf) -> @@ -410,6 +416,7 @@ t_flow_control(_) -> JsonCfg = make_limiter_json(<<"1/1s">>), emqx_limiter_server:add_bucket(emqx_retainer, internal, LimiterCfg), emqx_retainer:update_config(#{ + <<"delivery_rate">> => <<"1/1s">>, <<"flow_control">> => #{ <<"batch_read_number">> => 1, From d12335c4c613880a4c5279cbfbfaa79de5205100 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 18:10:42 +0800 Subject: [PATCH 14/18] chore: add tests --- .../emqx_utils/test/emqx_utils_stream_tests.erl | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/apps/emqx_utils/test/emqx_utils_stream_tests.erl b/apps/emqx_utils/test/emqx_utils_stream_tests.erl index 904572375..89ca92d20 100644 --- a/apps/emqx_utils/test/emqx_utils_stream_tests.erl +++ b/apps/emqx_utils/test/emqx_utils_stream_tests.erl @@ -84,13 +84,22 @@ mqueue_test() -> ). csv_test() -> - Data = <<"h1,h2,h3\r\nv1,v2,v3\r\nv4,v5,v6">>, + Data1 = <<"h1,h2,h3\r\nvv1,vv2,vv3\r\nvv4,vv5,vv6">>, ?assertEqual( [ - #{<<"h1">> => <<"v1">>, <<"h2">> => <<"v2">>, <<"h3">> => <<"v3">>}, - #{<<"h1">> => <<"v4">>, <<"h2">> => <<"v5">>, <<"h3">> => <<"v6">>} + #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>}, + #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>} ], - emqx_utils_stream:consume(emqx_utils_stream:csv(Data)) + emqx_utils_stream:consume(emqx_utils_stream:csv(Data1)) + ), + + Data2 = <<"h1, h2, h3\nvv1, vv2, vv3\nvv4,vv5,vv6\n">>, + ?assertEqual( + [ + #{<<"h1">> => <<"vv1">>, <<"h2">> => <<"vv2">>, <<"h3">> => <<"vv3">>}, + #{<<"h1">> => <<"vv4">>, <<"h2">> => <<"vv5">>, <<"h3">> => <<"vv6">>} + ], + emqx_utils_stream:consume(emqx_utils_stream:csv(Data2)) ), ?assertEqual( From ded1b4d8b3c1615c0d83ae0482caa63ae128021f Mon Sep 17 00:00:00 2001 From: JianBo He Date: Mon, 29 Jan 2024 18:24:29 +0800 Subject: [PATCH 15/18] chore: apply suggestions from code review Co-authored-by: ieQu1 <99872536+ieQu1@users.noreply.github.com> --- changes/ce/feat-12396.en.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/ce/feat-12396.en.md b/changes/ce/feat-12396.en.md index f54f1dc30..c1a35d3f8 100644 --- a/changes/ce/feat-12396.en.md +++ b/changes/ce/feat-12396.en.md @@ -1,4 +1,4 @@ Enhanced the `authentication/:id/import_users` interface for a more user-friendly user import feature: -- Add new parameter `?type=plain` to support importing users with plaintext passwords in extension to the current solution which only supports password hash. +- Add new parameter `?type=plain` to support importing users with plaintext passwords in addition to the current solution which only supports password hash. - Support `content-type: application/json` to accept HTTP Body in JSON format in extension to the current solution which only supports `multipart/form-data` for csv format. From 5007650bd2f6ade5f2e6621abad2a08163bdcafd Mon Sep 17 00:00:00 2001 From: Serge Tupchii Date: Fri, 26 Jan 2024 18:26:34 +0200 Subject: [PATCH 16/18] perf(emqx_broker): pick broker pool worker by topic/shard pair to distribute the load more evenly. Fixes: EMQX-11812 --- apps/emqx/src/emqx_broker.erl | 114 +++++++++++++++++---------- apps/emqx/src/emqx_router_syncer.erl | 26 +++--- 2 files changed, 88 insertions(+), 52 deletions(-) diff --git a/apps/emqx/src/emqx_broker.erl b/apps/emqx/src/emqx_broker.erl index ac9116cbd..23679700e 100644 --- a/apps/emqx/src/emqx_broker.erl +++ b/apps/emqx/src/emqx_broker.erl @@ -85,6 +85,16 @@ %% Guards -define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))). +-define(cast_or_eval(Pid, Msg, Expr), + case Pid =:= self() of + true -> + _ = Expr, + ok; + false -> + cast(Pid, Msg) + end +). + -spec start_link(atom(), pos_integer()) -> startlink_ret(). start_link(Pool, Id) -> ok = create_tabs(), @@ -159,15 +169,7 @@ do_subscribe(Topic, SubPid, SubOpts) when is_binary(Topic) -> %% https://emqx.atlassian.net/browse/EMQX-10214 I = emqx_broker_helper:get_sub_shard(SubPid, Topic), true = ets:insert(?SUBOPTION, {{Topic, SubPid}, with_shard_idx(I, SubOpts)}), - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - Sync = call(pick(Topic), {subscribe, Topic, SubPid, I}), + Sync = call(pick({Topic, I}), {subscribe, Topic, SubPid, I}), case Sync of ok -> ok; @@ -218,15 +220,7 @@ do_unsubscribe2(Topic, SubPid, SubOpts) when 0 -> emqx_exclusive_subscription:unsubscribe(Topic, SubOpts); _ -> ok end, - %% NOTE - %% We are relying on the local state to minimize global routing state changes, - %% thus it's important that some operations on ETS tables on the same topic - %% should not be interleaved: `ets:member/2` + `ets:insert/2` that are part of - %% broker's `subscribe` codepath, and `ets:delete_object/2` that's part of - %% `unsubscribe` codepath. So we have to pick a worker according to the topic, - %% but not shard. If there are topics with high number of shards, then the - %% load across the pool will be unbalanced. - cast(pick(Topic), {unsubscribed, Topic, SubPid, I}); + cast(pick({Topic, I}), {unsubscribed, Topic, SubPid, I}); do_unsubscribe2(#share{group = Group, topic = Topic}, SubPid, _SubOpts) when is_binary(Group), is_binary(Topic), is_pid(SubPid) -> @@ -501,8 +495,8 @@ cast(Broker, Req) -> gen_server:cast(Broker, Req). %% Pick a broker -pick(Topic) -> - gproc_pool:pick_worker(broker_pool, Topic). +pick(TopicShard) -> + gproc_pool:pick_worker(broker_pool, TopicShard). %%-------------------------------------------------------------------- %% gen_server callbacks @@ -514,36 +508,72 @@ init([Pool, Id]) -> handle_call({subscribe, Topic, SubPid, 0}, {From, _Tag}, State) -> Existed = ets:member(?SUBSCRIBER, Topic), + Result = maybe_add_route(Existed, Topic, From), + assert_ok_result(Result), true = ets:insert(?SUBSCRIBER, {Topic, SubPid}), - Result = maybe_add_route(Existed, Topic, From), - {reply, Result, State}; -handle_call({subscribe, Topic, SubPid, I}, {From, _Tag}, State) -> - Existed = ets:member(?SUBSCRIBER, Topic), - true = ets:insert(?SUBSCRIBER, [ - {Topic, {shard, I}}, - {{shard, Topic, I}, SubPid} - ]), - Result = maybe_add_route(Existed, Topic, From), {reply, Result, State}; +handle_call({subscribe, Topic, SubPid, I}, _From, State) -> + Existed = ets:member(?SUBSCRIBER, {shard, Topic, I}), + Recs = [{{shard, Topic, I}, SubPid}], + Recs1 = + case Existed of + false -> + %% This will attempt to add a route per each new shard. + %% The overhead must be negligible, but the consistency in general + %% and race conditions safety is expected to be stronger. + %% The main purpose is to solve the race when + %% `{shard, Topic, N}` (where N > 0) + %% is the first ever processed subscribe request per `Topic`. + %% It inserts `{Topic, {shard, I}}` to `?SUBSCRIBER` tab. + %% After that, another broker worker starts processing + %% `{shard, Topic, 0}` sub and already observers `{shard, Topic, N}`, + %% i.e. `ets:member(?SUBSCRIBER, Topic)` returns false, + %% so it doesn't add the route. + %% Even if this happens, this cast is expected to be processed eventually + %% and the route should be added (unless the worker restarts...) + ?cast_or_eval( + pick({Topic, 0}), + {subscribed, Topic, shard, I}, + sync_route(add, Topic, #{}) + ), + [{Topic, {shard, I}} | Recs]; + true -> + Recs + end, + true = ets:insert(?SUBSCRIBER, Recs1), + {reply, ok, State}; handle_call(Req, _From, State) -> ?SLOG(error, #{msg => "unexpected_call", call => Req}), {reply, ignored, State}. +handle_cast({subscribed, Topic, shard, _I}, State) -> + %% Do not need to 'maybe add' (i.e. to check if the route exists). + %% It was already checked that this shard is newely added. + _ = sync_route(add, Topic, #{}), + {noreply, State}; +handle_cast({unsubscribed, Topic, shard, _I}, State) -> + _ = maybe_delete_route(Topic), + {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, 0}, State) -> true = ets:delete_object(?SUBSCRIBER, {Topic, SubPid}), - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), + _ = maybe_delete_route(Topic), {noreply, State}; handle_cast({unsubscribed, Topic, SubPid, I}, State) -> true = ets:delete_object(?SUBSCRIBER, {{shard, Topic, I}, SubPid}), case ets:member(?SUBSCRIBER, {shard, Topic, I}) of false -> - ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}); + ets:delete_object(?SUBSCRIBER, {Topic, {shard, I}}), + %% Do not attempt to delete any routes here, + %% let it be handled only by the same pool worker per topic (0 shard), + %% so that all route deletes are serialized. + ?cast_or_eval( + pick({Topic, 0}), + {unsubscribed, Topic, shard, I}, + maybe_delete_route(Topic) + ); true -> - true + ok end, - Exists = ets:member(?SUBSCRIBER, Topic), - _Result = maybe_delete_route(Exists, Topic), {noreply, State}; handle_cast(Msg, State) -> ?SLOG(error, #{msg => "unexpected_cast", cast => Msg}), @@ -582,7 +612,7 @@ do_dispatch(Topic, #delivery{message = Msg}) -> {ok, DispN} end. -%% Donot dispatch to share subscriber here. +%% Don't dispatch to share subscriber here. %% we do it in `emqx_shared_sub.erl` with configured strategy do_dispatch(SubPid, Topic, Msg) when is_pid(SubPid) -> case erlang:is_process_alive(SubPid) of @@ -603,15 +633,19 @@ do_dispatch({shard, I}, Topic, Msg) -> %% +assert_ok_result(ok) -> ok; +assert_ok_result(Ref) when is_reference(Ref) -> ok. + maybe_add_route(_Existed = false, Topic, ReplyTo) -> sync_route(add, Topic, #{reply => ReplyTo}); maybe_add_route(_Existed = true, _Topic, _ReplyTo) -> ok. -maybe_delete_route(_Exists = false, Topic) -> - sync_route(delete, Topic, #{}); -maybe_delete_route(_Exists = true, _Topic) -> - ok. +maybe_delete_route(Topic) -> + case ets:member(?SUBSCRIBER, Topic) of + true -> ok; + false -> sync_route(delete, Topic, #{}) + end. sync_route(Action, Topic, ReplyTo) -> EnabledOn = emqx_config:get([broker, routing, batch_sync, enable_on]), diff --git a/apps/emqx/src/emqx_router_syncer.erl b/apps/emqx/src/emqx_router_syncer.erl index dccc681c8..fef5a1fc7 100644 --- a/apps/emqx/src/emqx_router_syncer.erl +++ b/apps/emqx/src/emqx_router_syncer.erl @@ -97,7 +97,7 @@ push(Action, Topic, Dest, Opts) -> Context = mk_push_context(Opts), _ = erlang:send(Worker, ?PUSH(Prio, {Action, Topic, Dest, Context})), case Context of - {MRef, _} -> + [{MRef, _}] -> MRef; [] -> ok @@ -128,7 +128,7 @@ designate_prio(delete, #{}) -> mk_push_context(#{reply := To}) -> MRef = erlang:make_ref(), - {MRef, To}; + [{MRef, To}]; mk_push_context(_) -> []. @@ -272,8 +272,8 @@ send_replies(Errors, Batch) -> replyctx_send(_Result, []) -> noreply; -replyctx_send(Result, {MRef, Pid}) -> - _ = erlang:send(Pid, {MRef, Result}), +replyctx_send(Result, RefsPids) -> + _ = lists:foreach(fun({MRef, Pid}) -> erlang:send(Pid, {MRef, Result}) end, RefsPids), ok. %% @@ -316,10 +316,11 @@ stash_add(Prio, ?OP(Action, Topic, Dest, Ctx), Stash) -> Stash#{Route := RouteOpMerged} end. -merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), DestOp = ?ROUTEOP(Action)) -> - %% NOTE: This should not happen anyway. - _ = replyctx_send(ignored, Ctx1), - DestOp; +merge_route_op(?ROUTEOP(Action, _Prio1, Ctx1), ?ROUTEOP(Action, Prio2, Ctx2)) -> + %% NOTE: This can happen as topic shard can be processed concurrently + %% by different broker worker, see emqx_broker for more details. + MergedCtx = Ctx1 ++ Ctx2, + ?ROUTEOP(Action, Prio2, MergedCtx); merge_route_op(?ROUTEOP(_Action1, _Prio1, Ctx1), DestOp = ?ROUTEOP(_Action2, _Prio2, _Ctx2)) -> %% NOTE: Latter cancel the former. %% Strictly speaking, in ideal conditions we could just cancel both, because they @@ -352,7 +353,7 @@ stash_stats(Stash) -> batch_test() -> Dest = node(), - Ctx = fun(N) -> {N, self()} end, + Ctx = fun(N) -> [{N, self()}] end, Stash = stash_add( [ ?PUSH(?PRIO_BG, ?OP(delete, <<"t/2">>, Dest, Ctx(1))), @@ -375,6 +376,7 @@ batch_test() -> stash_new() ), {Batch, StashLeft} = mk_batch(Stash, 5), + ?assertMatch( #{ {<<"t/1">>, Dest} := {add, ?PRIO_LO, _}, @@ -392,16 +394,16 @@ batch_test() -> }, StashLeft ), + + %% Replies are only sent to superseded ops: ?assertEqual( [ - {2, ignored}, {1, ok}, {5, ok}, - {7, ignored}, {4, ok}, {9, ok}, + {7, ok}, {8, ok}, - {13, ignored}, {11, ok} ], emqx_utils_stream:consume(emqx_utils_stream:mqueue(0)) From aedfc8e8c0f49b1b139b8d0294655f1cedde2d87 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 Jan 2024 14:14:20 +0800 Subject: [PATCH 17/18] fix(user_import): ensure the last record overwrites previous one --- .../emqx_authn/emqx_authn_user_import_api.erl | 12 ++++- .../test/data/user-credentials-plain-dup.csv | 3 ++ .../test/data/user-credentials-plain-dup.json | 12 +++++ .../src/emqx_authn_mnesia.erl | 2 +- .../test/emqx_authn_mnesia_SUITE.erl | 49 +++++++++++++++++++ 5 files changed, 75 insertions(+), 3 deletions(-) create mode 100644 apps/emqx_auth/test/data/user-credentials-plain-dup.csv create mode 100644 apps/emqx_auth/test/data/user-credentials-plain-dup.json diff --git a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl index f45923756..2b2ccecac 100644 --- a/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl +++ b/apps/emqx_auth/src/emqx_authn/emqx_authn_user_import_api.erl @@ -96,8 +96,16 @@ request_body_schema() -> schema => #{ type => object, example => [ - #{<<"user_id">> => <<"user1">>, <<"password">> => <<"password1">>}, - #{<<"user_id">> => <<"user2">>, <<"password">> => <<"password2">>} + #{ + <<"user_id">> => <<"user1">>, + <<"password">> => <<"password1">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"user2">>, + <<"password">> => <<"password2">>, + <<"is_superuser">> => false + } ] } } diff --git a/apps/emqx_auth/test/data/user-credentials-plain-dup.csv b/apps/emqx_auth/test/data/user-credentials-plain-dup.csv new file mode 100644 index 000000000..9c263c50b --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain-dup.csv @@ -0,0 +1,3 @@ +user_id,password,is_superuser +myuser3,password3,true +myuser3,password4,false diff --git a/apps/emqx_auth/test/data/user-credentials-plain-dup.json b/apps/emqx_auth/test/data/user-credentials-plain-dup.json new file mode 100644 index 000000000..0bda977c6 --- /dev/null +++ b/apps/emqx_auth/test/data/user-credentials-plain-dup.json @@ -0,0 +1,12 @@ +[ + { + "user_id":"myuser1", + "password":"password1", + "is_superuser": true + }, + { + "user_id":"myuser1", + "password":"password2", + "is_superuser": false + } +] diff --git a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl index bae7dc96b..8cbd8f35e 100644 --- a/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl +++ b/apps/emqx_auth_mnesia/src/emqx_authn_mnesia.erl @@ -457,7 +457,7 @@ parse_import_users(Filename, FileData, Convertor) -> end end, ReaderFn = reader_fn(Filename, FileData), - Users = lists:reverse(Eval(ReaderFn)), + Users = Eval(ReaderFn), NewUsersCount = lists:foldl( fun( diff --git a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl index 479b5cdde..a1cbf362a 100644 --- a/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl +++ b/apps/emqx_auth_mnesia/test/emqx_authn_mnesia_SUITE.erl @@ -340,6 +340,55 @@ t_import_users_prepared_list(_) -> ) ). +t_import_users_duplicated_records(_) -> + Config0 = config(), + Config = Config0#{password_hash_algorithm => #{name => plain, salt_position => disable}}, + {ok, State} = emqx_authn_mnesia:create(?AUTHN_ID, Config), + + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain-dup.json">>), + State + ) + ), + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + sample_filename_and_data(plain, <<"user-credentials-plain-dup.csv">>), + State + ) + ), + Users1 = [ + #{ + <<"user_id">> => <<"myuser5">>, + <<"password">> => <<"password5">>, + <<"is_superuser">> => true + }, + #{ + <<"user_id">> => <<"myuser5">>, + <<"password">> => <<"password6">>, + <<"is_superuser">> => false + } + ], + ?assertEqual( + ok, + emqx_authn_mnesia:import_users( + {plain, prepared_user_list, Users1}, + State + ) + ), + + %% assert: the last record overwrites the previous one + ?assertMatch( + [ + {user_info, {_, <<"myuser1">>}, <<"password2">>, _, false}, + {user_info, {_, <<"myuser3">>}, <<"password4">>, _, false}, + {user_info, {_, <<"myuser5">>}, <<"password6">>, _, false} + ], + ets:tab2list(emqx_authn_mnesia) + ). + %%------------------------------------------------------------------------------ %% Helpers %%------------------------------------------------------------------------------ From 4190682a167e611c217a745e373407fcfecb1ea2 Mon Sep 17 00:00:00 2001 From: JianBo He Date: Tue, 30 Jan 2024 15:29:46 +0800 Subject: [PATCH 18/18] chore: fix the data type and example value for cluster invitation result --- apps/emqx_management/src/emqx_mgmt_api_cluster.erl | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl index 686a0be71..6a56ef454 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_cluster.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_cluster.erl @@ -206,11 +206,10 @@ fields(node_invitation_succeed) -> [ {finished_at, ?HOCON( - emqx_utils_calendar:epoch_millisecond(), + binary(), #{ - desc => - <<"The time of the async invitation result is received, millisecond precision epoch">>, - example => <<"1705044829915">> + desc => <<"The time of the async invitation result is received">>, + example => <<"2024-01-30T15:24:39.355+08:00">> } )} ]; @@ -223,11 +222,10 @@ fields(node_invitation_in_progress) -> )}, {started_at, ?HOCON( - emqx_utils_calendar:epoch_millisecond(), + binary(), #{ - desc => - <<"The start timestamp of the invitation, millisecond precision epoch">>, - example => <<"1705044829915">> + desc => <<"The time of the async invitation is started">>, + example => <<"2024-01-30T15:24:39.355+08:00">> } )} ].