From 9449e3cb32a67b0112907fd5defecc120bcb1c77 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 18 Aug 2022 13:52:07 +0200 Subject: [PATCH 1/7] refactor(auth_mnesia): Export transaction funs --- .../emqx_enhanced_authn_scram_mnesia.erl | 132 ++++++++--------- .../src/simple_authn/emqx_authn_mnesia.erl | 137 +++++++++--------- 2 files changed, 137 insertions(+), 132 deletions(-) diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index bc26140a6..cb3fad7dc 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -52,6 +52,14 @@ group_match_spec/1 ]). +%% Internal exports (RPC) +-export([ + do_destroy/1, + do_add_user/2, + do_delete_user/2, + do_update_user/3 +]). + -define(TAB, ?MODULE). -define(AUTHN_QSCHEMA, [ {<<"like_user_id">>, binary}, @@ -170,83 +178,79 @@ authenticate(_Credential, _State) -> ignore. destroy(#{user_group := UserGroup}) -> + trans(fun ?MODULE:do_destroy/1, [UserGroup]). + +do_destroy(UserGroup) -> MatchSpec = group_match_spec(UserGroup), - trans( - fun() -> - ok = lists:foreach( - fun(UserInfo) -> - mnesia:delete_object(?TAB, UserInfo, write) - end, - mnesia:select(?TAB, MatchSpec, write) - ) - end + ok = lists:foreach( + fun(UserInfo) -> + mnesia:delete_object(?TAB, UserInfo, write) + end, + mnesia:select(?TAB, MatchSpec, write) ). -add_user( +add_user(UserInfo, State) -> + trans(fun ?MODULE:do_add_user/2, [UserInfo, State]). + +do_add_user( #{ user_id := UserID, password := Password } = UserInfo, #{user_group := UserGroup} = State ) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - IsSuperuser = maps:get(is_superuser, UserInfo, false), - add_user(UserGroup, UserID, Password, IsSuperuser, State), - {ok, #{user_id => UserID, is_superuser => IsSuperuser}}; - [_] -> - {error, already_exist} - end - end - ). + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + IsSuperuser = maps:get(is_superuser, UserInfo, false), + add_user(UserGroup, UserID, Password, IsSuperuser, State), + {ok, #{user_id => UserID, is_superuser => IsSuperuser}}; + [_] -> + {error, already_exist} + end. -delete_user(UserID, #{user_group := UserGroup}) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - {error, not_found}; - [_] -> - mnesia:delete(?TAB, {UserGroup, UserID}, write) - end - end - ). +delete_user(UserID, State) -> + trans(fun ?MODULE:do_delete_user/2, [UserID, State]). -update_user( +do_delete_user(UserID, #{user_group := UserGroup}) -> + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + {error, not_found}; + [_] -> + mnesia:delete(?TAB, {UserGroup, UserID}, write) + end. + +update_user(UserID, User, State) -> + trans(fun ?MODULE:do_update_user/3, [UserID, User, State]). + +do_update_user( UserID, User, #{user_group := UserGroup} = State ) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - {error, not_found}; - [#user_info{is_superuser = IsSuperuser} = UserInfo] -> - UserInfo1 = UserInfo#user_info{ - is_superuser = maps:get(is_superuser, User, IsSuperuser) - }, - UserInfo2 = - case maps:get(password, User, undefined) of - undefined -> - UserInfo1; - Password -> - {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( - Password, State - ), - UserInfo1#user_info{ - stored_key = StoredKey, - server_key = ServerKey, - salt = Salt - } - end, - mnesia:write(?TAB, UserInfo2, write), - {ok, format_user_info(UserInfo2)} - end - end - ). + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + {error, not_found}; + [#user_info{is_superuser = IsSuperuser} = UserInfo] -> + UserInfo1 = UserInfo#user_info{ + is_superuser = maps:get(is_superuser, User, IsSuperuser) + }, + UserInfo2 = + case maps:get(password, User, undefined) of + undefined -> + UserInfo1; + Password -> + {StoredKey, ServerKey, Salt} = esasl_scram:generate_authentication_info( + Password, State + ), + UserInfo1#user_info{ + stored_key = StoredKey, + server_key = ServerKey, + salt = Salt + } + end, + mnesia:write(?TAB, UserInfo2, write), + {ok, format_user_info(UserInfo2)} + end. lookup_user(UserID, #{user_group := UserGroup}) -> case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of @@ -386,12 +390,10 @@ retrieve(UserID, #{user_group := UserGroup}) -> end. %% TODO: Move to emqx_authn_utils.erl -trans(Fun) -> - trans(Fun, []). - trans(Fun, Args) -> case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; + {aborted, {function_clause, Stack}} -> erlang:raise(error, function_clause, Stack); {aborted, Reason} -> {error, Reason} end. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index c3380b91f..7276ad428 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -54,6 +54,16 @@ group_match_spec/1 ]). +%% Internal exports (RPC) +-export([ + do_destroy/1, + do_add_user/2, + do_delete_user/2, + do_update_user/3, + import/2, + import_csv/3 +]). + -type user_group() :: binary(). -type user_id() :: binary(). @@ -175,15 +185,14 @@ authenticate( end. destroy(#{user_group := UserGroup}) -> - trans( - fun() -> - ok = lists:foreach( - fun(User) -> - mnesia:delete_object(?TAB, User, write) - end, - mnesia:select(?TAB, group_match_spec(UserGroup), write) - ) - end + trans(fun ?MODULE:do_destroy/1, [UserGroup]). + +do_destroy(UserGroup) -> + ok = lists:foreach( + fun(User) -> + mnesia:delete_object(?TAB, User, write) + end, + mnesia:select(?TAB, group_match_spec(UserGroup), write) ). import_users({Filename0, FileData}, State) -> @@ -200,7 +209,10 @@ import_users({Filename0, FileData}, State) -> {error, {unsupported_file_format, Extension}} end. -add_user( +add_user(UserInfo, State) -> + trans(fun ?MODULE:do_add_user/2, [UserInfo, State]). + +do_add_user( #{ user_id := UserID, password := Password @@ -210,33 +222,31 @@ add_user( password_hash_algorithm := Algorithm } ) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - {PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password), - IsSuperuser = maps:get(is_superuser, UserInfo, false), - insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), - {ok, #{user_id => UserID, is_superuser => IsSuperuser}}; - [_] -> - {error, already_exist} - end - end - ). + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + {PasswordHash, Salt} = emqx_authn_password_hashing:hash(Algorithm, Password), + IsSuperuser = maps:get(is_superuser, UserInfo, false), + insert_user(UserGroup, UserID, PasswordHash, Salt, IsSuperuser), + {ok, #{user_id => UserID, is_superuser => IsSuperuser}}; + [_] -> + {error, already_exist} + end. -delete_user(UserID, #{user_group := UserGroup}) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - {error, not_found}; - [_] -> - mnesia:delete(?TAB, {UserGroup, UserID}, write) - end - end - ). +delete_user(UserID, State) -> + trans(fun ?MODULE:do_delete_user/2, [UserID, State]). -update_user( +do_delete_user(UserID, #{user_group := UserGroup}) -> + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + {error, not_found}; + [_] -> + mnesia:delete(?TAB, {UserGroup, UserID}, write) + end. + +update_user(UserID, UserInfo, State) -> + trans(fun ?MODULE:do_update_user/3, [UserID, UserInfo, State]). + +do_update_user( UserID, UserInfo, #{ @@ -244,33 +254,29 @@ update_user( password_hash_algorithm := Algorithm } ) -> - trans( - fun() -> - case mnesia:read(?TAB, {UserGroup, UserID}, write) of - [] -> - {error, not_found}; - [ - #user_info{ - password_hash = PasswordHash, - salt = Salt, - is_superuser = IsSuperuser - } - ] -> - NSuperuser = maps:get(is_superuser, UserInfo, IsSuperuser), - {NPasswordHash, NSalt} = - case UserInfo of - #{password := Password} -> - emqx_authn_password_hashing:hash( - Algorithm, Password - ); - #{} -> - {PasswordHash, Salt} - end, - insert_user(UserGroup, UserID, NPasswordHash, NSalt, NSuperuser), - {ok, #{user_id => UserID, is_superuser => NSuperuser}} - end - end - ). + case mnesia:read(?TAB, {UserGroup, UserID}, write) of + [] -> + {error, not_found}; + [ + #user_info{ + password_hash = PasswordHash, + salt = Salt, + is_superuser = IsSuperuser + } + ] -> + NSuperuser = maps:get(is_superuser, UserInfo, IsSuperuser), + {NPasswordHash, NSalt} = + case UserInfo of + #{password := Password} -> + emqx_authn_password_hashing:hash( + Algorithm, Password + ); + #{} -> + {PasswordHash, Salt} + end, + insert_user(UserGroup, UserID, NPasswordHash, NSalt, NSuperuser), + {ok, #{user_id => UserID, is_superuser => NSuperuser}} + end. lookup_user(UserID, #{user_group := UserGroup}) -> case mnesia:dirty_read(?TAB, {UserGroup, UserID}) of @@ -335,7 +341,7 @@ run_fuzzy_filter( import_users_from_json(Bin, #{user_group := UserGroup}) -> case emqx_json:safe_decode(Bin, [return_maps]) of {ok, List} -> - trans(fun import/2, [UserGroup, List]); + trans(fun ?MODULE:import/2, [UserGroup, List]); {error, Reason} -> {error, Reason} end. @@ -344,7 +350,7 @@ import_users_from_json(Bin, #{user_group := UserGroup}) -> import_users_from_csv(CSV, #{user_group := UserGroup}) -> case get_csv_header(CSV) of {ok, Seq, NewCSV} -> - trans(fun import_csv/3, [UserGroup, NewCSV, Seq]); + trans(fun ?MODULE:import_csv/3, [UserGroup, NewCSV, Seq]); {error, Reason} -> {error, Reason} end. @@ -435,9 +441,6 @@ get_user_identity(#{clientid := ClientID}, clientid) -> get_user_identity(_, Type) -> {error, {bad_user_identity_type, Type}}. -trans(Fun) -> - trans(Fun, []). - trans(Fun, Args) -> case mria:transaction(?AUTH_SHARD, Fun, Args) of {atomic, Res} -> Res; From d906715e0a0182f49df02e237cea09fdf7455b54 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:19:31 +0200 Subject: [PATCH 2/7] refactor(telemetry): Export transactions --- apps/emqx_modules/src/emqx_telemetry.erl | 95 +++++++++++++----------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index 8c7f01c23..f55457e9d 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -54,6 +54,11 @@ -export([official_version/1]). +%% Internal exports (RPC) +-export([ + do_ensure_uuids/0 +]). + %% internal export -export([read_raw_build_info/0]). @@ -530,54 +535,56 @@ bin(B) when is_binary(B) -> B. ensure_uuids() -> - Txn = fun() -> - NodeUUID = - case mnesia:wread({?TELEMETRY, node()}) of - [] -> - NodeUUID0 = - case get_uuid_from_file(node) of - {ok, NUUID} -> NUUID; - undefined -> generate_uuid() - end, - mnesia:write( - ?TELEMETRY, - #telemetry{ - id = node(), - uuid = NodeUUID0 - }, - write - ), - NodeUUID0; - [#telemetry{uuid = NodeUUID0}] -> - NodeUUID0 - end, - ClusterUUID = - case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of - [] -> - ClusterUUID0 = - case get_uuid_from_file(cluster) of - {ok, CUUID} -> CUUID; - undefined -> generate_uuid() - end, - mnesia:write( - ?TELEMETRY, - #telemetry{ - id = ?CLUSTER_UUID_KEY, - uuid = ClusterUUID0 - }, - write - ), - ClusterUUID0; - [#telemetry{uuid = ClusterUUID0}] -> - ClusterUUID0 - end, - {NodeUUID, ClusterUUID} - end, - {atomic, {NodeUUID, ClusterUUID}} = mria:transaction(?TELEMETRY_SHARD, Txn), + {atomic, {NodeUUID, ClusterUUID}} = mria:transaction( + ?TELEMETRY_SHARD, fun ?MODULE:do_ensure_uuids/0 + ), save_uuid_to_file(NodeUUID, node), save_uuid_to_file(ClusterUUID, cluster), {NodeUUID, ClusterUUID}. +do_ensure_uuids() -> + NodeUUID = + case mnesia:wread({?TELEMETRY, node()}) of + [] -> + NodeUUID0 = + case get_uuid_from_file(node) of + {ok, NUUID} -> NUUID; + undefined -> generate_uuid() + end, + mnesia:write( + ?TELEMETRY, + #telemetry{ + id = node(), + uuid = NodeUUID0 + }, + write + ), + NodeUUID0; + [#telemetry{uuid = NodeUUID0}] -> + NodeUUID0 + end, + ClusterUUID = + case mnesia:wread({?TELEMETRY, ?CLUSTER_UUID_KEY}) of + [] -> + ClusterUUID0 = + case get_uuid_from_file(cluster) of + {ok, CUUID} -> CUUID; + undefined -> generate_uuid() + end, + mnesia:write( + ?TELEMETRY, + #telemetry{ + id = ?CLUSTER_UUID_KEY, + uuid = ClusterUUID0 + }, + write + ), + ClusterUUID0; + [#telemetry{uuid = ClusterUUID0}] -> + ClusterUUID0 + end, + {NodeUUID, ClusterUUID}. + get_uuid_from_file(Type) -> Path = uuid_file_path(Type), case file:read_file(Path) of From fa12c66ad91b8eb4c07e200936d283cd68a740cd Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Thu, 18 Aug 2022 14:19:50 +0200 Subject: [PATCH 3/7] refactor(psk): Export transactions --- apps/emqx_psk/src/emqx_psk.app.src | 2 +- apps/emqx_psk/src/emqx_psk.erl | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/apps/emqx_psk/src/emqx_psk.app.src b/apps/emqx_psk/src/emqx_psk.app.src index 4b405dd08..c3786bcc0 100644 --- a/apps/emqx_psk/src/emqx_psk.app.src +++ b/apps/emqx_psk/src/emqx_psk.app.src @@ -2,7 +2,7 @@ {application, emqx_psk, [ {description, "EMQX PSK"}, % strict semver, bump manually! - {vsn, "5.0.0"}, + {vsn, "5.0.1"}, {modules, []}, {registered, [emqx_psk_sup]}, {applications, [kernel, stdlib]}, diff --git a/apps/emqx_psk/src/emqx_psk.erl b/apps/emqx_psk/src/emqx_psk.erl index 99354d230..3a9406fdb 100644 --- a/apps/emqx_psk/src/emqx_psk.erl +++ b/apps/emqx_psk/src/emqx_psk.erl @@ -43,6 +43,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + insert_psks/1 +]). + -record(psk_entry, { psk_id :: binary(), shared_secret :: binary(), @@ -199,10 +204,10 @@ import_psks(SrcFile) -> import_psks(Io, Delimiter, ChunkSize, NChunk) -> case get_psks(Io, Delimiter, ChunkSize) of {ok, Entries} -> - _ = trans(fun insert_psks/1, [Entries]), + _ = trans(fun ?MODULE:insert_psks/1, [Entries]), import_psks(Io, Delimiter, ChunkSize, NChunk + 1); {eof, Entries} -> - _ = trans(fun insert_psks/1, [Entries]), + _ = trans(fun ?MODULE:insert_psks/1, [Entries]), ok; {error, {bad_format, {line, N}}} -> {error, {bad_format, {line, NChunk * ChunkSize + N}}}; From 6f4d0e2ed5a3a49d2b2f05e38b668e7a440e8599 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Aug 2022 10:56:26 +0200 Subject: [PATCH 4/7] refactor(retainer_mnesia): Export transactions --- .../src/emqx_retainer_mnesia.erl | 192 +++++++++--------- .../src/emqx_retainer_mnesia_cli.erl | 4 +- 2 files changed, 97 insertions(+), 99 deletions(-) diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index c861d27e4..c236b9c28 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -36,6 +36,15 @@ size/1 ]). +%% Internal exports (RPC) +-export([ + do_store_retained/1, + do_clear_expired/0, + do_delete_message/1, + do_populate_index_meta/1, + do_reindex_batch/2 +]). + %% Management API: -export([topics/0]). @@ -126,26 +135,8 @@ create_table(Table, RecordName, Attributes, Type, StorageType) -> ok end. -store_retained(_, #message{topic = Topic} = Msg) -> - ExpiryTime = emqx_retainer:get_expiry_time(Msg), - Tokens = topic_to_tokens(Topic), - Fun = - case is_table_full() of - false -> - fun() -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime) - end; - _ -> - fun() -> - case mnesia:read(?TAB_MESSAGE, Tokens, write) of - [_] -> - store_retained(db_indices(write), Msg, Tokens, ExpiryTime); - [] -> - mnesia:abort(table_is_full) - end - end - end, - case mria:transaction(?RETAINER_SHARD, Fun) of +store_retained(_, Msg = #message{topic = Topic}) -> + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_store_retained/1, [Msg]) of {atomic, ok} -> ?tp(debug, message_retained, #{topic => Topic}), ok; @@ -157,7 +148,26 @@ store_retained(_, #message{topic = Topic} = Msg) -> }) end. +do_store_retained(#message{topic = Topic} = Msg) -> + ExpiryTime = emqx_retainer:get_expiry_time(Msg), + Tokens = topic_to_tokens(Topic), + case is_table_full() of + false -> + store_retained(db_indices(write), Msg, Tokens, ExpiryTime); + _ -> + case mnesia:read(?TAB_MESSAGE, Tokens, write) of + [_] -> + store_retained(db_indices(write), Msg, Tokens, ExpiryTime); + [] -> + mnesia:abort(table_is_full) + end + end. + clear_expired(_) -> + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_clear_expired/0), + ok. + +do_clear_expired() -> NowMs = erlang:system_time(millisecond), QH = qlc:q([ TopicTokens @@ -167,36 +177,29 @@ clear_expired(_) -> } <- mnesia:table(?TAB_MESSAGE, [{lock, write}]), (ExpiryTime =/= 0) and (ExpiryTime < NowMs) ]), - Fun = fun() -> - QC = qlc:cursor(QH), - clear_batch(db_indices(write), QC) - end, - {atomic, _} = mria:transaction(?RETAINER_SHARD, Fun), - ok. + QC = qlc:cursor(QH), + clear_batch(db_indices(write), QC). delete_message(_, Topic) -> - Tokens = topic_to_tokens(Topic), - DeleteFun = - case emqx_topic:wildcard(Topic) of - false -> - fun() -> - ok = delete_message_by_topic(Tokens, db_indices(write)) - end; - true -> - fun() -> - QH = topic_search_table(Tokens), - qlc:fold( - fun(TopicTokens, _) -> - ok = delete_message_by_topic(TopicTokens, db_indices(write)) - end, - undefined, - QH - ) - end - end, - {atomic, _} = mria:transaction(?RETAINER_SHARD, DeleteFun), + {atomic, _} = mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_delete_message/1, [Topic]), ok. +do_delete_message(Topic) -> + Tokens = topic_to_tokens(Topic), + case emqx_topic:wildcard(Topic) of + false -> + ok = delete_message_by_topic(Tokens, db_indices(write)); + true -> + QH = topic_search_table(Tokens), + qlc:fold( + fun(TopicTokens, _) -> + ok = delete_message_by_topic(TopicTokens, db_indices(write)) + end, + undefined, + QH + ) + end. + read_message(_, Topic) -> {ok, read_messages(Topic)}. @@ -267,16 +270,11 @@ reindex(Force, StatusFun) -> reindex(config_indices(), Force, StatusFun). reindex_status() -> - Fun = fun() -> - mnesia:read(?TAB_INDEX_META, ?META_KEY) - end, - case mria:transaction(?RETAINER_SHARD, Fun) of - {atomic, [#retained_index_meta{reindexing = true}]} -> + case mnesia:dirty_read(?TAB_INDEX_META, ?META_KEY) of + [#retained_index_meta{reindexing = true}] -> true; - {atomic, _} -> - false; - {aborted, Reason} -> - {error, Reason} + _ -> + false end. %%-------------------------------------------------------------------- @@ -439,37 +437,7 @@ config_indices() -> populate_index_meta() -> ConfigIndices = config_indices(), - Fun = fun() -> - case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of - [ - #retained_index_meta{ - read_indices = ReadIndices, - write_indices = WriteIndices, - reindexing = Reindexing - } - ] -> - case {ReadIndices, WriteIndices, Reindexing} of - {_, _, true} -> - ok; - {ConfigIndices, ConfigIndices, false} -> - ok; - {DBWriteIndices, DBReadIndices, false} -> - {error, DBWriteIndices, DBReadIndices} - end; - [] -> - mnesia:write( - ?TAB_INDEX_META, - #retained_index_meta{ - key = ?META_KEY, - read_indices = ConfigIndices, - write_indices = ConfigIndices, - reindexing = false - }, - write - ) - end - end, - case mria:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_populate_index_meta/1, [ConfigIndices]) of {atomic, ok} -> ok; {atomic, {error, DBWriteIndices, DBReadIndices}} -> @@ -488,6 +456,36 @@ populate_index_meta() -> {error, Reason} end. +do_populate_index_meta(ConfigIndices) -> + case mnesia:read(?TAB_INDEX_META, ?META_KEY, write) of + [ + #retained_index_meta{ + read_indices = ReadIndices, + write_indices = WriteIndices, + reindexing = Reindexing + } + ] -> + case {ReadIndices, WriteIndices, Reindexing} of + {_, _, true} -> + ok; + {ConfigIndices, ConfigIndices, false} -> + ok; + {DBWriteIndices, DBReadIndices, false} -> + {error, DBWriteIndices, DBReadIndices} + end; + [] -> + mnesia:write( + ?TAB_INDEX_META, + #retained_index_meta{ + key = ?META_KEY, + read_indices = ConfigIndices, + write_indices = ConfigIndices, + reindexing = false + }, + write + ) + end. + db_indices(Type) -> case mnesia:read(?TAB_INDEX_META, ?META_KEY) of [#retained_index_meta{read_indices = ReadIndices, write_indices = WriteIndices}] -> @@ -533,6 +531,7 @@ reindex(NewIndices, Force, StatusFun) when end. try_start_reindex(NewIndices, true) -> + %% Note: we don't expect reindexing during upgrade, so this function is internal mria:transaction( ?RETAINER_SHARD, fun() -> start_reindex(NewIndices) end @@ -566,6 +565,7 @@ start_reindex(NewIndices) -> ). finalize_reindex() -> + %% Note: we don't expect reindexing during upgrade, so this function is internal {atomic, ok} = mria:transaction( ?RETAINER_SHARD, fun() -> @@ -601,16 +601,7 @@ reindex_topic(Indices, Topic) -> end. reindex_batch(QC, Done, StatusFun) -> - Fun = fun() -> - Indices = db_indices(write), - {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), - ok = lists:foreach( - fun(Topic) -> reindex_topic(Indices, Topic) end, - Topics - ), - {Status, Done + length(Topics)} - end, - case mria:transaction(?RETAINER_SHARD, Fun) of + case mria:transaction(?RETAINER_SHARD, fun ?MODULE:do_reindex_batch/2, [QC, Done]) of {atomic, {more, NewDone}} -> _ = StatusFun(NewDone), reindex_batch(QC, NewDone, StatusFun); @@ -625,6 +616,15 @@ reindex_batch(QC, Done, StatusFun) -> {error, Reason} end. +do_reindex_batch(QC, Done) -> + Indices = db_indices(write), + {Status, Topics} = qlc_next_answers(QC, ?REINDEX_BATCH_SIZE), + ok = lists:foreach( + fun(Topic) -> reindex_topic(Indices, Topic) end, + Topics + ), + {Status, Done + length(Topics)}. + wait_dispatch_complete(Timeout) -> Nodes = mria_mnesia:running_nodes(), {Results, []} = emqx_retainer_proto_v1:wait_dispatch_complete(Nodes, Timeout), diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl index 22eeafe08..a576b953d 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia_cli.erl @@ -45,9 +45,7 @@ retainer(["reindex", "status"]) -> true -> ?PRINT_MSG("Reindexing is in progress~n"); false -> - ?PRINT_MSG("Reindexing is not running~n"); - {error, Reason} -> - ?PRINT("Can't get reindex status: ~p~n", [Reason]) + ?PRINT_MSG("Reindexing is not running~n") end; retainer(["reindex", "start"]) -> retainer(["reindex", "start", "false"]); From f323e3cb79c4cdec7daa398ed8e0ed8adfa04051 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Aug 2022 13:41:09 +0200 Subject: [PATCH 5/7] refactor(emqx_conf): Extract transactions --- apps/emqx_conf/src/emqx_cluster_rpc.erl | 34 +++++++++++-------- .../src/emqx_cluster_rpc_handler.erl | 7 +++- apps/emqx_conf/src/emqx_conf.app.src | 2 +- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/apps/emqx_conf/src/emqx_cluster_rpc.erl b/apps/emqx_conf/src/emqx_cluster_rpc.erl index 6353a4efa..ddc4eccc5 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc.erl @@ -31,10 +31,16 @@ fast_forward_to_commit/2 ]). -export([ - get_node_tnx_id/1, + commit/2, + commit_status_trans/2, get_cluster_tnx_id/0, + get_node_tnx_id/1, + init_mfa/2, latest_tnx_id/0, - make_initiate_call_req/3 + make_initiate_call_req/3, + read_next_mfa/1, + trans_query/1, + trans_status/0 ]). -export([ @@ -194,18 +200,18 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) -> -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. query(TnxId) -> - transaction(fun trans_query/1, [TnxId]). + transaction(fun ?MODULE:trans_query/1, [TnxId]). -spec reset() -> reset. reset() -> gen_server:call(?MODULE, reset). -spec status() -> {'atomic', [map()]} | {'aborted', Reason :: term()}. status() -> - transaction(fun trans_status/0, []). + transaction(fun ?MODULE:trans_status/0, []). -spec latest_tnx_id() -> pos_integer(). latest_tnx_id() -> - {atomic, TnxId} = transaction(fun get_cluster_tnx_id/0, []), + {atomic, TnxId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), TnxId. -spec make_initiate_call_req(module(), atom(), list()) -> init_call_req(). @@ -280,7 +286,7 @@ handle_call(reset, _From, State) -> _ = mria:clear_table(?CLUSTER_MFA), {reply, ok, State, {continue, ?CATCH_UP}}; handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> - case transaction(fun init_mfa/2, [Node, MFA]) of + case transaction(fun ?MODULE:init_mfa/2, [Node, MFA]) of {atomic, {ok, TnxId, Result}} -> {reply, {ok, TnxId, Result}, State, {continue, ?CATCH_UP}}; {aborted, Error} -> @@ -288,7 +294,7 @@ handle_call(?INITIATE(MFA), _From, State = #{node := Node}) -> end; handle_call(skip_failed_commit, _From, State = #{node := Node}) -> Timeout = catch_up(State, true), - {atomic, LatestId} = transaction(fun get_node_tnx_id/1, [Node]), + {atomic, LatestId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]), {reply, LatestId, State, Timeout}; handle_call({fast_forward_to_commit, ToTnxId}, _From, State) -> NodeId = do_fast_forward_to_commit(ToTnxId, State), @@ -316,14 +322,14 @@ code_change(_OldVsn, State, _Extra) -> catch_up(State) -> catch_up(State, false). catch_up(#{node := Node, retry_interval := RetryMs} = State, SkipResult) -> - case transaction(fun read_next_mfa/1, [Node]) of + case transaction(fun ?MODULE:read_next_mfa/1, [Node]) of {atomic, caught_up} -> ?TIMEOUT; {atomic, {still_lagging, NextId, MFA}} -> {Succeed, _} = apply_mfa(NextId, MFA, ?APPLY_KIND_REPLICATE), case Succeed orelse SkipResult of true -> - case transaction(fun commit/2, [Node, NextId]) of + case transaction(fun ?MODULE:commit/2, [Node, NextId]) of {atomic, ok} -> catch_up(State, false); Error -> @@ -367,12 +373,12 @@ commit(Node, TnxId) -> ok = mnesia:write(?CLUSTER_COMMIT, #cluster_rpc_commit{node = Node, tnx_id = TnxId}, write). do_fast_forward_to_commit(ToTnxId, State = #{node := Node}) -> - {atomic, NodeId} = transaction(fun get_node_tnx_id/1, [Node]), + {atomic, NodeId} = transaction(fun ?MODULE:get_node_tnx_id/1, [Node]), case NodeId >= ToTnxId of true -> NodeId; false -> - {atomic, LatestId} = transaction(fun get_cluster_tnx_id/0, []), + {atomic, LatestId} = transaction(fun ?MODULE:get_cluster_tnx_id/0, []), case LatestId =< NodeId of true -> NodeId; @@ -529,11 +535,11 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> end. lagging_node(TnxId) -> - {atomic, Nodes} = transaction(fun commit_status_trans/2, ['<', TnxId]), + {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), Nodes. synced_nodes(TnxId) -> - {atomic, Nodes} = transaction(fun commit_status_trans/2, ['>=', TnxId]), + {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['>=', TnxId]), Nodes. commit_status_trans(Operator, TnxId) -> @@ -547,5 +553,5 @@ get_retry_ms() -> maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok; maybe_init_tnx_id(Node, TnxId) -> - {atomic, _} = transaction(fun commit/2, [Node, TnxId]), + {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]), ok. diff --git a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl index 40df5a02c..7f7c7f77f 100644 --- a/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl +++ b/apps/emqx_conf/src/emqx_cluster_rpc_handler.erl @@ -30,6 +30,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + del_stale_mfa/1 +]). + start_link() -> MaxHistory = emqx_conf:get(["node", "cluster_call", "max_history"], 100), CleanupMs = emqx_conf:get(["node", "cluster_call", "cleanup_interval"], 5 * 60 * 1000), @@ -56,7 +61,7 @@ handle_cast(Msg, State) -> {noreply, State}. handle_info({timeout, TRef, del_stale_mfa}, State = #{timer := TRef, max_history := MaxHistory}) -> - case mria:transaction(?CLUSTER_RPC_SHARD, fun del_stale_mfa/1, [MaxHistory]) of + case mria:transaction(?CLUSTER_RPC_SHARD, fun ?MODULE:del_stale_mfa/1, [MaxHistory]) of {atomic, ok} -> ok; Error -> ?SLOG(error, #{msg => "del_stale_cluster_rpc_mfa_error", error => Error}) end, diff --git a/apps/emqx_conf/src/emqx_conf.app.src b/apps/emqx_conf/src/emqx_conf.app.src index 1441a4180..854fdac07 100644 --- a/apps/emqx_conf/src/emqx_conf.app.src +++ b/apps/emqx_conf/src/emqx_conf.app.src @@ -1,6 +1,6 @@ {application, emqx_conf, [ {description, "EMQX configuration management"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, []}, {mod, {emqx_conf_app, []}}, {applications, [kernel, stdlib]}, From 5aceeff7b483213451bf28057d66c80a84813b89 Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Aug 2022 15:14:30 +0200 Subject: [PATCH 6/7] refactor(cm_registry): Export transactions --- apps/emqx/src/emqx_cm_registry.erl | 7 ++++++- apps/emqx_gateway/src/emqx_gateway_cm_registry.erl | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 7049d31d5..ebd4b2977 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -44,6 +44,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + do_cleanup_channels/1 +]). + -define(REGISTRY, ?MODULE). -define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). @@ -155,7 +160,7 @@ cleanup_channels(Node) -> global:trans( {?LOCK, self()}, fun() -> - mria:transaction(?CM_SHARD, fun do_cleanup_channels/1, [Node]) + mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/1, [Node]) end ). diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 65532deaa..5c3e8bb45 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -42,6 +42,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + do_cleanup_channels/2 +]). + -define(CM_SHARD, emqx_gateway_cm_shard). -define(LOCK, {?MODULE, cleanup_down}). @@ -148,7 +153,7 @@ cleanup_channels(Node, Name) -> global:trans( {?LOCK, self()}, fun() -> - mria:transaction(?CM_SHARD, fun do_cleanup_channels/2, [Node, Tab]) + mria:transaction(?CM_SHARD, fun ?MODULE:do_cleanup_channels/2, [Node, Tab]) end ). From 5978e4c8be3ac98fa161119d4703868c7a0e714d Mon Sep 17 00:00:00 2001 From: ieQu1 <99872536+ieQu1@users.noreply.github.com> Date: Mon, 22 Aug 2022 15:14:47 +0200 Subject: [PATCH 7/7] refactor(sn_registry): Export transactions --- .../src/mqttsn/emqx_sn_registry.erl | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 6086ec7d6..448aa8ad5 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -46,6 +46,11 @@ code_change/3 ]). +%% Internal exports (RPC) +-export([ + do_register/4 +]). + -export([lookup_name/1]). -define(SN_SHARD, emqx_sn_shard). @@ -173,33 +178,11 @@ handle_call( TopicId when TopicId >= 16#FFFF -> {reply, {error, too_large}, State}; TopicId -> - Fun = fun() -> - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, next_topic_id}, - value = TopicId + 1 - }, - write - ), - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, TopicName}, - value = TopicId - }, - write - ), - mnesia:write( - Tab, - #emqx_sn_registry{ - key = {ClientId, TopicId}, - value = TopicName - }, - write - ) - end, - case mria:transaction(?SN_SHARD, Fun) of + case + mria:transaction(?SN_SHARD, fun ?MODULE:do_register/4, [ + Tab, ClientId, TopicId, TopicName + ]) + of {atomic, ok} -> {reply, TopicId, State}; {aborted, Error} -> @@ -248,6 +231,32 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +do_register(Tab, ClientId, TopicId, TopicName) -> + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, next_topic_id}, + value = TopicId + 1 + }, + write + ), + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, TopicName}, + value = TopicId + }, + write + ), + mnesia:write( + Tab, + #emqx_sn_registry{ + key = {ClientId, TopicId}, + value = TopicName + }, + write + ). + %%----------------------------------------------------------------------------- next_topic_id(Tab, PredefId, ClientId) ->