Merge pull request #5509 from k32/rlog-fixes

Minor fixes to improve compatibility with RLOG feature
This commit is contained in:
k32 2021-08-23 16:10:33 +02:00 committed by GitHub
commit 397a04ec7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 33 additions and 52 deletions

View File

@ -26,6 +26,7 @@
-define(COMMON_SHARD, emqx_common_shard).
-define(SHARED_SUB_SHARD, emqx_shared_sub_shard).
-define(MOD_DELAYED_SHARD, emqx_delayed_shard).
-define(CM_SHARD, emqx_cm_shard).
%%--------------------------------------------------------------------
%% Banner

View File

@ -13,7 +13,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.1"}}}
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}

View File

@ -85,9 +85,6 @@
-define(DEACTIVATED_ALARM, emqx_deactivated_alarm).
-rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}).
-rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}).
-ifdef(TEST).
-compile(export_all).
-compile(nowarn_export_all).

View File

@ -50,8 +50,6 @@
-define(BANNED_TAB, ?MODULE).
-rlog_shard({?COMMON_SHARD, ?BANNED_TAB}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -59,6 +57,7 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(?BANNED_TAB, [
{type, set},
{rlog_shard, ?COMMON_SHARD},
{disc_copies, [node()]},
{record_name, banned},
{attributes, record_info(fields, banned)},

View File

@ -47,10 +47,6 @@
-define(TAB, emqx_channel_registry).
-define(LOCK, {?MODULE, cleanup_down}).
-define(CM_SHARD, emqx_cm_shard).
-rlog_shard({?CM_SHARD, ?TAB}).
-record(channel, {chid, pid}).
%% @doc Start the global channel registry.
@ -106,6 +102,7 @@ record(ClientId, ChanPid) ->
init([]) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{rlog_shard, ?CM_SHARD},
{ram_copies, [node()]},
{record_name, channel},
{attributes, record_info(fields, channel)},

View File

@ -68,7 +68,6 @@
-type(dest() :: node() | {group(), node()}).
-define(ROUTE_TAB, emqx_route).
-rlog_shard({?ROUTE_SHARD, ?ROUTE_TAB}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
@ -77,6 +76,7 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTE_TAB, [
{type, bag},
{rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]},
{record_name, route},
{attributes, record_info(fields, route)},

View File

@ -52,8 +52,6 @@
-define(ROUTING_NODE, emqx_routing_node).
-define(LOCK, {?MODULE, cleanup_routes}).
-rlog_shard({?ROUTE_SHARD, ?ROUTING_NODE}).
-dialyzer({nowarn_function, [cleanup_routes/1]}).
%%--------------------------------------------------------------------
@ -63,6 +61,7 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(?ROUTING_NODE, [
{type, set},
{rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]},
{record_name, routing_node},
{attributes, record_info(fields, routing_node)},

View File

@ -76,8 +76,6 @@
-define(NACK(Reason), {shared_sub_nack, Reason}).
-define(NO_ACK, no_ack).
-rlog_shard({?SHARED_SUB_SHARD, ?TAB}).
-record(state, {pmon}).
-record(emqx_shared_subscription, {group, topic, subpid}).
@ -89,6 +87,7 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, bag},
{rlog_shard, ?SHARED_SUB_SHARD},
{ram_copies, [node()]},
{record_name, emqx_shared_subscription},
{attributes, record_info(fields, emqx_shared_subscription)}]);

View File

@ -50,8 +50,6 @@
, count = 0 :: non_neg_integer()
}).
-rlog_shard({?ROUTE_SHARD, ?TRIE}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------
@ -64,6 +62,7 @@ mnesia(boot) ->
{write_concurrency, true}
]}],
ok = ekka_mnesia:create_table(?TRIE, [
{rlog_shard, ?ROUTE_SHARD},
{ram_copies, [node()]},
{record_name, ?TRIE},
{attributes, record_info(fields, ?TRIE)},

View File

@ -350,7 +350,7 @@ handle_call({lookup_chain, ID}, _From, State) ->
end;
handle_call({create_authenticator, ChainID, #{name := Name} = Config}, _From, State) ->
UpdateFun =
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case lists:keymember(Name, 2, Authenticators) of
true ->
@ -374,7 +374,7 @@ handle_call({create_authenticator, ChainID, #{name := Name} = Config}, _From, St
reply(Reply, State);
handle_call({delete_authenticator, ChainID, AuthenticatorID}, _From, State) ->
UpdateFun =
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case lists:keytake(AuthenticatorID, 1, Authenticators) of
false ->
@ -397,7 +397,7 @@ handle_call({update_or_create_authenticator, ChainID, AuthenticatorID, Config},
reply(Reply, State);
handle_call({move_authenticator, ChainID, AuthenticatorID, Position}, _From, State) ->
UpdateFun =
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case do_move_authenticator(AuthenticatorID, Authenticators, Position) of
{ok, NAuthenticators} ->
@ -524,7 +524,7 @@ do_delete_authenticator(#authenticator{provider = Provider, state = State}) ->
ok.
update_or_create_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Config, CreateWhenNotFound) ->
UpdateFun =
UpdateFun =
fun(#chain{authenticators = Authenticators} = Chain) ->
case lists:keytake(AuthenticatorID, 1, Authenticators) of
false ->
@ -586,7 +586,7 @@ update_or_create_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Co
end
end,
update_chain(ChainID, UpdateFun).
replace_authenticator(ID, #authenticator{name = Name} = Authenticator, Authenticators) ->
lists:keyreplace(ID, 1, Authenticators, {ID, Name, Authenticator}).

View File

@ -45,8 +45,6 @@
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
-rlog_shard({?AUTH_SHARD, ?TAB}).
-record(user_info,
{ user_id
, stored_key
@ -63,6 +61,7 @@
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]},
{record_name, user_info},
{attributes, record_info(fields, user_info)},
@ -112,7 +111,7 @@ create(#{ algorithm := Algorithm
update(Config, #{user_group := Unique}) ->
create(Config#{'_unique' => Unique}).
authenticate(#{auth_method := AuthMethod,
auth_data := AuthData,
auth_cache := AuthCache}, State) ->
@ -272,4 +271,4 @@ trans(Fun, Args) ->
end.
serialize_user_info(#user_info{user_id = {_, UserID}, superuser = Superuser}) ->
#{user_id => UserID, superuser => Superuser}.
#{user_id => UserID, superuser => Superuser}.

View File

@ -58,7 +58,6 @@
-define(TAB, ?MODULE).
-rlog_shard({?AUTH_SHARD, ?TAB}).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
@ -67,6 +66,7 @@
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{rlog_shard, ?AUTH_SHARD},
{disc_copies, [node()]},
{record_name, user_info},
{attributes, record_info(fields, user_info)},

View File

@ -20,8 +20,6 @@
-include("emqx_dashboard.hrl").
-rlog_shard({?DASHBOARD_SHARD, mqtt_admin}).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
@ -54,6 +52,7 @@
mnesia(boot) ->
ok = ekka_mnesia:create_table(mqtt_admin, [
{type, set},
{rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]},
{record_name, mqtt_admin},
{attributes, record_info(fields, mqtt_admin)},

View File

@ -26,8 +26,6 @@
, destroy_by_username/1
]).
-rlog_shard({?DASHBOARD_SHARD, mqtt_admin_jwt}).
-boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}).
@ -80,6 +78,7 @@ destroy_by_username(Username) ->
mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [
{type, set},
{rlog_shard, ?DASHBOARD_SHARD},
{disc_copies, [node()]},
{record_name, mqtt_admin_jwt},
{attributes, record_info(fields, mqtt_admin_jwt)},

View File

@ -40,6 +40,8 @@
, code_change/3
]).
-include_lib("emqx/include/emqx.hrl").
-define(LOCK, {?MODULE, cleanup_down}).
-record(channel, {chid, pid}).
@ -89,6 +91,7 @@ init([Type]) ->
Tab = tabname(Type),
ok = ekka_mnesia:create_table(Tab, [
{type, bag},
{rlog_shard, ?CM_SHARD},
{ram_copies, [node()]},
{record_name, channel},
{attributes, record_info(fields, channel)},

View File

@ -60,8 +60,6 @@
%-boot_mnesia({mnesia, [boot]}).
%-copy_mnesia({mnesia, [copy]}).
%-rlog_shard({?SN_SHARD, ?TAB}).
%%% @doc Create or replicate tables.
%-spec(mnesia(boot | copy) -> ok).
%mnesia(boot) ->
@ -149,9 +147,11 @@ init([InstaId, PredefTopics]) ->
{ram_copies, [node()]},
{record_name, emqx_sn_registry},
{attributes, record_info(fields, emqx_sn_registry)},
{storage_properties, [{ets, [{read_concurrency, true}]}]}
{storage_properties, [{ets, [{read_concurrency, true}]}]},
{rlog_shard, ?SN_SHARD}
]),
ok = ekka_mnesia:copy_table(Tab, ram_copies),
ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity),
% FIXME:
%ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity),
MaxPredefId = lists:foldl(

View File

@ -120,7 +120,7 @@ start_one_app(App) ->
?SLOG(debug, #{msg => "started_apps", apps => Apps});
{error, Reason} ->
?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}),
error({faile_to_start_app, App, Reason})
error({failed_to_start_app, App, Reason})
end.
%% list of app names which should be rebooted when:
@ -131,7 +131,6 @@ reboot_apps() ->
, esockd
, ranch
, cowboy
, ekka
, emqx
, emqx_prometheus
, emqx_modules

View File

@ -48,14 +48,13 @@
-include("emqx_mgmt.hrl").
-rlog_shard({?MANAGEMENT_SHARD, mqtt_app}).
%%--------------------------------------------------------------------
%% Mnesia Bootstrap
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = ekka_mnesia:create_table(mqtt_app, [
{rlog_shard, ?MANAGEMENT_SHARD},
{disc_copies, [node()]},
{record_name, mqtt_app},
{attributes, record_info(fields, mqtt_app)}]);

View File

@ -56,8 +56,6 @@
-define(SERVER, ?MODULE).
-define(MAX_INTERVAL, 4294967).
-rlog_shard({?MOD_DELAYED_SHARD, ?TAB}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------

View File

@ -86,8 +86,6 @@
-define(TELEMETRY, emqx_telemetry).
-rlog_shard({?COMMON_SHARD, ?TELEMETRY}).
%%--------------------------------------------------------------------
%% Mnesia bootstrap
%%--------------------------------------------------------------------

View File

@ -57,8 +57,6 @@
, wait_quotas := list()
}.
-rlog_shard({?RETAINER_SHARD, ?TAB}).
-define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)).
-define(DEF_EXPIRY_INTERVAL, 0).

View File

@ -34,8 +34,6 @@
-export([create_resource/1]).
-rlog_shard({?RETAINER_SHARD, ?TAB}).
-record(retained, {topic, msg, expiry_time}).
-type batch_read_result() ::
@ -56,6 +54,7 @@ create_resource(#{storage_type := StorageType}) ->
{dets, [{auto_save, 1000}]}],
ok = ekka_mnesia:create_table(?TAB, [
{type, set},
{rlog_shard, ?RETAINER_SHARD},
{Copies, [node()]},
{record_name, retained},
{attributes, record_info(fields, retained)},

View File

@ -96,11 +96,6 @@
-define(T_CALL, 10000).
-rlog_shard({?RULE_ENGINE_SHARD, ?RULE_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?ACTION_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TAB}).
-rlog_shard({?RULE_ENGINE_SHARD, ?RES_TYPE_TAB}).
%%------------------------------------------------------------------------------
%% Mnesia bootstrap
%%------------------------------------------------------------------------------
@ -112,6 +107,7 @@ mnesia(boot) ->
StoreProps = [{ets, [{read_concurrency, true}]}],
%% Rule table
ok = ekka_mnesia:create_table(?RULE_TAB, [
{rlog_shard, ?RULE_ENGINE_SHARD},
{disc_copies, [node()]},
{record_name, rule},
{index, [#rule.for]},
@ -119,6 +115,7 @@ mnesia(boot) ->
{storage_properties, StoreProps}]),
%% Rule action table
ok = ekka_mnesia:create_table(?ACTION_TAB, [
{rlog_shard, ?RULE_ENGINE_SHARD},
{ram_copies, [node()]},
{record_name, action},
{index, [#action.for, #action.app]},
@ -126,6 +123,7 @@ mnesia(boot) ->
{storage_properties, StoreProps}]),
%% Resource table
ok = ekka_mnesia:create_table(?RES_TAB, [
{rlog_shard, ?RULE_ENGINE_SHARD},
{disc_copies, [node()]},
{record_name, resource},
{index, [#resource.type]},
@ -133,6 +131,7 @@ mnesia(boot) ->
{storage_properties, StoreProps}]),
%% Resource type table
ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [
{rlog_shard, ?RULE_ENGINE_SHARD},
{ram_copies, [node()]},
{record_name, resource_type},
{index, [#resource_type.provider]},

View File

@ -49,7 +49,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}