diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index 60dccd9a3..58fdc5f98 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -26,6 +26,7 @@ -define(COMMON_SHARD, emqx_common_shard). -define(SHARED_SUB_SHARD, emqx_shared_sub_shard). -define(MOD_DELAYED_SHARD, emqx_delayed_shard). +-define(CM_SHARD, emqx_cm_shard). %%-------------------------------------------------------------------- %% Banner diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index f34173fae..19c91d6ac 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -13,7 +13,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.11.1"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} diff --git a/apps/emqx/src/emqx_alarm.erl b/apps/emqx/src/emqx_alarm.erl index 7599f9569..8f3e1c568 100644 --- a/apps/emqx/src/emqx_alarm.erl +++ b/apps/emqx/src/emqx_alarm.erl @@ -85,9 +85,6 @@ -define(DEACTIVATED_ALARM, emqx_deactivated_alarm). --rlog_shard({?COMMON_SHARD, ?ACTIVATED_ALARM}). --rlog_shard({?COMMON_SHARD, ?DEACTIVATED_ALARM}). - -ifdef(TEST). -compile(export_all). -compile(nowarn_export_all). diff --git a/apps/emqx/src/emqx_banned.erl b/apps/emqx/src/emqx_banned.erl index f0991d967..c143a20a6 100644 --- a/apps/emqx/src/emqx_banned.erl +++ b/apps/emqx/src/emqx_banned.erl @@ -50,8 +50,6 @@ -define(BANNED_TAB, ?MODULE). --rlog_shard({?COMMON_SHARD, ?BANNED_TAB}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -59,6 +57,7 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?BANNED_TAB, [ {type, set}, + {rlog_shard, ?COMMON_SHARD}, {disc_copies, [node()]}, {record_name, banned}, {attributes, record_info(fields, banned)}, diff --git a/apps/emqx/src/emqx_cm_registry.erl b/apps/emqx/src/emqx_cm_registry.erl index 9326d2b8e..6fc34dee8 100644 --- a/apps/emqx/src/emqx_cm_registry.erl +++ b/apps/emqx/src/emqx_cm_registry.erl @@ -47,10 +47,6 @@ -define(TAB, emqx_channel_registry). -define(LOCK, {?MODULE, cleanup_down}). --define(CM_SHARD, emqx_cm_shard). - --rlog_shard({?CM_SHARD, ?TAB}). - -record(channel, {chid, pid}). %% @doc Start the global channel registry. @@ -106,6 +102,7 @@ record(ClientId, ChanPid) -> init([]) -> ok = ekka_mnesia:create_table(?TAB, [ {type, bag}, + {rlog_shard, ?CM_SHARD}, {ram_copies, [node()]}, {record_name, channel}, {attributes, record_info(fields, channel)}, diff --git a/apps/emqx/src/emqx_router.erl b/apps/emqx/src/emqx_router.erl index 8989c3b10..c39571d9f 100644 --- a/apps/emqx/src/emqx_router.erl +++ b/apps/emqx/src/emqx_router.erl @@ -68,7 +68,6 @@ -type(dest() :: node() | {group(), node()}). -define(ROUTE_TAB, emqx_route). --rlog_shard({?ROUTE_SHARD, ?ROUTE_TAB}). %%-------------------------------------------------------------------- %% Mnesia bootstrap @@ -77,6 +76,7 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?ROUTE_TAB, [ {type, bag}, + {rlog_shard, ?ROUTE_SHARD}, {ram_copies, [node()]}, {record_name, route}, {attributes, record_info(fields, route)}, diff --git a/apps/emqx/src/emqx_router_helper.erl b/apps/emqx/src/emqx_router_helper.erl index 5866e86b3..78d763cac 100644 --- a/apps/emqx/src/emqx_router_helper.erl +++ b/apps/emqx/src/emqx_router_helper.erl @@ -52,8 +52,6 @@ -define(ROUTING_NODE, emqx_routing_node). -define(LOCK, {?MODULE, cleanup_routes}). --rlog_shard({?ROUTE_SHARD, ?ROUTING_NODE}). - -dialyzer({nowarn_function, [cleanup_routes/1]}). %%-------------------------------------------------------------------- @@ -63,6 +61,7 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?ROUTING_NODE, [ {type, set}, + {rlog_shard, ?ROUTE_SHARD}, {ram_copies, [node()]}, {record_name, routing_node}, {attributes, record_info(fields, routing_node)}, diff --git a/apps/emqx/src/emqx_shared_sub.erl b/apps/emqx/src/emqx_shared_sub.erl index ccc050165..9e5dd726f 100644 --- a/apps/emqx/src/emqx_shared_sub.erl +++ b/apps/emqx/src/emqx_shared_sub.erl @@ -76,8 +76,6 @@ -define(NACK(Reason), {shared_sub_nack, Reason}). -define(NO_ACK, no_ack). --rlog_shard({?SHARED_SUB_SHARD, ?TAB}). - -record(state, {pmon}). -record(emqx_shared_subscription, {group, topic, subpid}). @@ -89,6 +87,7 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ {type, bag}, + {rlog_shard, ?SHARED_SUB_SHARD}, {ram_copies, [node()]}, {record_name, emqx_shared_subscription}, {attributes, record_info(fields, emqx_shared_subscription)}]); diff --git a/apps/emqx/src/emqx_trie.erl b/apps/emqx/src/emqx_trie.erl index ebfcfcbe3..ea70ff7f3 100644 --- a/apps/emqx/src/emqx_trie.erl +++ b/apps/emqx/src/emqx_trie.erl @@ -50,8 +50,6 @@ , count = 0 :: non_neg_integer() }). --rlog_shard({?ROUTE_SHARD, ?TRIE}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- @@ -64,6 +62,7 @@ mnesia(boot) -> {write_concurrency, true} ]}], ok = ekka_mnesia:create_table(?TRIE, [ + {rlog_shard, ?ROUTE_SHARD}, {ram_copies, [node()]}, {record_name, ?TRIE}, {attributes, record_info(fields, ?TRIE)}, diff --git a/apps/emqx_authn/src/emqx_authn.erl b/apps/emqx_authn/src/emqx_authn.erl index 571d76cc7..84629be78 100644 --- a/apps/emqx_authn/src/emqx_authn.erl +++ b/apps/emqx_authn/src/emqx_authn.erl @@ -350,7 +350,7 @@ handle_call({lookup_chain, ID}, _From, State) -> end; handle_call({create_authenticator, ChainID, #{name := Name} = Config}, _From, State) -> - UpdateFun = + UpdateFun = fun(#chain{authenticators = Authenticators} = Chain) -> case lists:keymember(Name, 2, Authenticators) of true -> @@ -374,7 +374,7 @@ handle_call({create_authenticator, ChainID, #{name := Name} = Config}, _From, St reply(Reply, State); handle_call({delete_authenticator, ChainID, AuthenticatorID}, _From, State) -> - UpdateFun = + UpdateFun = fun(#chain{authenticators = Authenticators} = Chain) -> case lists:keytake(AuthenticatorID, 1, Authenticators) of false -> @@ -397,7 +397,7 @@ handle_call({update_or_create_authenticator, ChainID, AuthenticatorID, Config}, reply(Reply, State); handle_call({move_authenticator, ChainID, AuthenticatorID, Position}, _From, State) -> - UpdateFun = + UpdateFun = fun(#chain{authenticators = Authenticators} = Chain) -> case do_move_authenticator(AuthenticatorID, Authenticators, Position) of {ok, NAuthenticators} -> @@ -524,7 +524,7 @@ do_delete_authenticator(#authenticator{provider = Provider, state = State}) -> ok. update_or_create_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Config, CreateWhenNotFound) -> - UpdateFun = + UpdateFun = fun(#chain{authenticators = Authenticators} = Chain) -> case lists:keytake(AuthenticatorID, 1, Authenticators) of false -> @@ -586,7 +586,7 @@ update_or_create_authenticator(ChainID, AuthenticatorID, #{name := NewName} = Co end end, update_chain(ChainID, UpdateFun). - + replace_authenticator(ID, #authenticator{name = Name} = Authenticator, Authenticators) -> lists:keyreplace(ID, 1, Authenticators, {ID, Name, Authenticator}). diff --git a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl index 2d433d408..f4fede9f5 100644 --- a/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl +++ b/apps/emqx_authn/src/enhanced_authn/emqx_enhanced_authn_scram_mnesia.erl @@ -45,8 +45,6 @@ -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). --rlog_shard({?AUTH_SHARD, ?TAB}). - -record(user_info, { user_id , stored_key @@ -63,6 +61,7 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ + {rlog_shard, ?AUTH_SHARD}, {disc_copies, [node()]}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, @@ -112,7 +111,7 @@ create(#{ algorithm := Algorithm update(Config, #{user_group := Unique}) -> create(Config#{'_unique' => Unique}). - + authenticate(#{auth_method := AuthMethod, auth_data := AuthData, auth_cache := AuthCache}, State) -> @@ -272,4 +271,4 @@ trans(Fun, Args) -> end. serialize_user_info(#user_info{user_id = {_, UserID}, superuser = Superuser}) -> - #{user_id => UserID, superuser => Superuser}. \ No newline at end of file + #{user_id => UserID, superuser => Superuser}. diff --git a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl index 08c0ffad1..9bbf3239c 100644 --- a/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl +++ b/apps/emqx_authn/src/simple_authn/emqx_authn_mnesia.erl @@ -58,7 +58,6 @@ -define(TAB, ?MODULE). --rlog_shard({?AUTH_SHARD, ?TAB}). %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -67,6 +66,7 @@ -spec(mnesia(boot | copy) -> ok). mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ + {rlog_shard, ?AUTH_SHARD}, {disc_copies, [node()]}, {record_name, user_info}, {attributes, record_info(fields, user_info)}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl index 982756805..8a1306e94 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_admin.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_admin.erl @@ -20,8 +20,6 @@ -include("emqx_dashboard.hrl"). --rlog_shard({?DASHBOARD_SHARD, mqtt_admin}). - -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -54,6 +52,7 @@ mnesia(boot) -> ok = ekka_mnesia:create_table(mqtt_admin, [ {type, set}, + {rlog_shard, ?DASHBOARD_SHARD}, {disc_copies, [node()]}, {record_name, mqtt_admin}, {attributes, record_info(fields, mqtt_admin)}, diff --git a/apps/emqx_dashboard/src/emqx_dashboard_token.erl b/apps/emqx_dashboard/src/emqx_dashboard_token.erl index 432a64621..9086b4c2e 100644 --- a/apps/emqx_dashboard/src/emqx_dashboard_token.erl +++ b/apps/emqx_dashboard/src/emqx_dashboard_token.erl @@ -26,8 +26,6 @@ , destroy_by_username/1 ]). --rlog_shard({?DASHBOARD_SHARD, mqtt_admin_jwt}). - -boot_mnesia({mnesia, [boot]}). -copy_mnesia({mnesia, [copy]}). @@ -80,6 +78,7 @@ destroy_by_username(Username) -> mnesia(boot) -> ok = ekka_mnesia:create_table(?TAB, [ {type, set}, + {rlog_shard, ?DASHBOARD_SHARD}, {disc_copies, [node()]}, {record_name, mqtt_admin_jwt}, {attributes, record_info(fields, mqtt_admin_jwt)}, diff --git a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl index 2c449828e..1d9daa637 100644 --- a/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl +++ b/apps/emqx_gateway/src/emqx_gateway_cm_registry.erl @@ -40,6 +40,8 @@ , code_change/3 ]). +-include_lib("emqx/include/emqx.hrl"). + -define(LOCK, {?MODULE, cleanup_down}). -record(channel, {chid, pid}). @@ -89,6 +91,7 @@ init([Type]) -> Tab = tabname(Type), ok = ekka_mnesia:create_table(Tab, [ {type, bag}, + {rlog_shard, ?CM_SHARD}, {ram_copies, [node()]}, {record_name, channel}, {attributes, record_info(fields, channel)}, diff --git a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl index 1249831cc..2534eee26 100644 --- a/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl +++ b/apps/emqx_gateway/src/mqttsn/emqx_sn_registry.erl @@ -60,8 +60,6 @@ %-boot_mnesia({mnesia, [boot]}). %-copy_mnesia({mnesia, [copy]}). -%-rlog_shard({?SN_SHARD, ?TAB}). - %%% @doc Create or replicate tables. %-spec(mnesia(boot | copy) -> ok). %mnesia(boot) -> @@ -149,9 +147,11 @@ init([InstaId, PredefTopics]) -> {ram_copies, [node()]}, {record_name, emqx_sn_registry}, {attributes, record_info(fields, emqx_sn_registry)}, - {storage_properties, [{ets, [{read_concurrency, true}]}]} + {storage_properties, [{ets, [{read_concurrency, true}]}]}, + {rlog_shard, ?SN_SHARD} ]), ok = ekka_mnesia:copy_table(Tab, ram_copies), + ok = ekka_rlog:wait_for_shards([?SN_SHARD], infinity), % FIXME: %ok = ekka_rlog:wait_for_shards([?CM_SHARD], infinity), MaxPredefId = lists:foldl( diff --git a/apps/emqx_machine/src/emqx_machine.erl b/apps/emqx_machine/src/emqx_machine.erl index bcfb1f501..3e5772b4a 100644 --- a/apps/emqx_machine/src/emqx_machine.erl +++ b/apps/emqx_machine/src/emqx_machine.erl @@ -120,7 +120,7 @@ start_one_app(App) -> ?SLOG(debug, #{msg => "started_apps", apps => Apps}); {error, Reason} -> ?SLOG(critical, #{msg => "failed_to_start_app", app => App, reason => Reason}), - error({faile_to_start_app, App, Reason}) + error({failed_to_start_app, App, Reason}) end. %% list of app names which should be rebooted when: @@ -131,7 +131,6 @@ reboot_apps() -> , esockd , ranch , cowboy - , ekka , emqx , emqx_prometheus , emqx_modules diff --git a/apps/emqx_management/src/emqx_mgmt_auth.erl b/apps/emqx_management/src/emqx_mgmt_auth.erl index 73ec37fc2..7c0eb8e82 100644 --- a/apps/emqx_management/src/emqx_mgmt_auth.erl +++ b/apps/emqx_management/src/emqx_mgmt_auth.erl @@ -48,14 +48,13 @@ -include("emqx_mgmt.hrl"). --rlog_shard({?MANAGEMENT_SHARD, mqtt_app}). - %%-------------------------------------------------------------------- %% Mnesia Bootstrap %%-------------------------------------------------------------------- mnesia(boot) -> ok = ekka_mnesia:create_table(mqtt_app, [ + {rlog_shard, ?MANAGEMENT_SHARD}, {disc_copies, [node()]}, {record_name, mqtt_app}, {attributes, record_info(fields, mqtt_app)}]); diff --git a/apps/emqx_modules/src/emqx_delayed.erl b/apps/emqx_modules/src/emqx_delayed.erl index df8387aaf..f7de5d69d 100644 --- a/apps/emqx_modules/src/emqx_delayed.erl +++ b/apps/emqx_modules/src/emqx_delayed.erl @@ -56,8 +56,6 @@ -define(SERVER, ?MODULE). -define(MAX_INTERVAL, 4294967). --rlog_shard({?MOD_DELAYED_SHARD, ?TAB}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/apps/emqx_modules/src/emqx_telemetry.erl b/apps/emqx_modules/src/emqx_telemetry.erl index aa207ac95..65852df93 100644 --- a/apps/emqx_modules/src/emqx_telemetry.erl +++ b/apps/emqx_modules/src/emqx_telemetry.erl @@ -86,8 +86,6 @@ -define(TELEMETRY, emqx_telemetry). --rlog_shard({?COMMON_SHARD, ?TELEMETRY}). - %%-------------------------------------------------------------------- %% Mnesia bootstrap %%-------------------------------------------------------------------- diff --git a/apps/emqx_retainer/src/emqx_retainer.erl b/apps/emqx_retainer/src/emqx_retainer.erl index 4c36cb541..cb4262451 100644 --- a/apps/emqx_retainer/src/emqx_retainer.erl +++ b/apps/emqx_retainer/src/emqx_retainer.erl @@ -57,8 +57,6 @@ , wait_quotas := list() }. --rlog_shard({?RETAINER_SHARD, ?TAB}). - -define(DEF_MAX_PAYLOAD_SIZE, (1024 * 1024)). -define(DEF_EXPIRY_INTERVAL, 0). diff --git a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl index 5f91a40c9..54d0f2c3a 100644 --- a/apps/emqx_retainer/src/emqx_retainer_mnesia.erl +++ b/apps/emqx_retainer/src/emqx_retainer_mnesia.erl @@ -34,8 +34,6 @@ -export([create_resource/1]). --rlog_shard({?RETAINER_SHARD, ?TAB}). - -record(retained, {topic, msg, expiry_time}). -type batch_read_result() :: @@ -56,6 +54,7 @@ create_resource(#{storage_type := StorageType}) -> {dets, [{auto_save, 1000}]}], ok = ekka_mnesia:create_table(?TAB, [ {type, set}, + {rlog_shard, ?RETAINER_SHARD}, {Copies, [node()]}, {record_name, retained}, {attributes, record_info(fields, retained)}, diff --git a/apps/emqx_rule_engine/src/emqx_rule_registry.erl b/apps/emqx_rule_engine/src/emqx_rule_registry.erl index d335a601b..c0bd5de7b 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_registry.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_registry.erl @@ -96,11 +96,6 @@ -define(T_CALL, 10000). --rlog_shard({?RULE_ENGINE_SHARD, ?RULE_TAB}). --rlog_shard({?RULE_ENGINE_SHARD, ?ACTION_TAB}). --rlog_shard({?RULE_ENGINE_SHARD, ?RES_TAB}). --rlog_shard({?RULE_ENGINE_SHARD, ?RES_TYPE_TAB}). - %%------------------------------------------------------------------------------ %% Mnesia bootstrap %%------------------------------------------------------------------------------ @@ -112,6 +107,7 @@ mnesia(boot) -> StoreProps = [{ets, [{read_concurrency, true}]}], %% Rule table ok = ekka_mnesia:create_table(?RULE_TAB, [ + {rlog_shard, ?RULE_ENGINE_SHARD}, {disc_copies, [node()]}, {record_name, rule}, {index, [#rule.for]}, @@ -119,6 +115,7 @@ mnesia(boot) -> {storage_properties, StoreProps}]), %% Rule action table ok = ekka_mnesia:create_table(?ACTION_TAB, [ + {rlog_shard, ?RULE_ENGINE_SHARD}, {ram_copies, [node()]}, {record_name, action}, {index, [#action.for, #action.app]}, @@ -126,6 +123,7 @@ mnesia(boot) -> {storage_properties, StoreProps}]), %% Resource table ok = ekka_mnesia:create_table(?RES_TAB, [ + {rlog_shard, ?RULE_ENGINE_SHARD}, {disc_copies, [node()]}, {record_name, resource}, {index, [#resource.type]}, @@ -133,6 +131,7 @@ mnesia(boot) -> {storage_properties, StoreProps}]), %% Resource type table ok = ekka_mnesia:create_table(?RES_TYPE_TAB, [ + {rlog_shard, ?RULE_ENGINE_SHARD}, {ram_copies, [node()]}, {record_name, resource_type}, {index, [#resource_type.provider]}, diff --git a/rebar.config b/rebar.config index 8cfc3cf7c..65384c9ab 100644 --- a/rebar.config +++ b/rebar.config @@ -49,7 +49,7 @@ , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.2"}}} - , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.4"}}} + , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.8"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.1.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}