feat(emqx_management): mqtt_app shard

This commit is contained in:
k32 2021-06-21 17:18:50 +02:00
parent 58d4791dd2
commit 239255f251
7 changed files with 27 additions and 13 deletions

View File

@ -13,7 +13,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} %% todo delete when plugins use hocon
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}} , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.7.0"}}}

View File

@ -32,4 +32,6 @@
-define(ERROR14, 114). %% OldPassword error -define(ERROR14, 114). %% OldPassword error
-define(ERROR15, 115). %% bad topic -define(ERROR15, 115). %% bad topic
-define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]). -define(VERSIONS, ["4.0", "4.1", "4.2", "4.3"]).
-define(MANAGEMENT_SHARD, emqx_management_shard).

View File

@ -24,8 +24,11 @@
, stop/1 , stop/1
]). ]).
-include("emqx_mgmt.hrl").
start(_Type, _Args) -> start(_Type, _Args) ->
{ok, Sup} = emqx_mgmt_sup:start_link(), {ok, Sup} = emqx_mgmt_sup:start_link(),
ok = ekka_rlog:wait_for_shards([?MANAGEMENT_SHARD], infinity),
_ = emqx_mgmt_auth:add_default_app(), _ = emqx_mgmt_auth:add_default_app(),
emqx_mgmt_http:start_listeners(), emqx_mgmt_http:start_listeners(),
emqx_mgmt_cli:load(), emqx_mgmt_cli:load(),

View File

@ -46,6 +46,10 @@
-type(appsecret() :: binary()). -type(appsecret() :: binary()).
-include("emqx_mgmt.hrl").
-rlog_shard({?MANAGEMENT_SHARD, mqtt_app}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia Bootstrap %% Mnesia Bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -102,7 +106,7 @@ add_app(AppId, Name, Secret, Desc, Status, Expired) when is_binary(AppId) ->
_ -> mnesia:abort(alread_existed) _ -> mnesia:abort(alread_existed)
end end
end, end,
case mnesia:transaction(AddFun) of case ekka_mnesia:transaction(?MANAGEMENT_SHARD, AddFun) of
{atomic, ok} -> {ok, Secret1}; {atomic, ok} -> {ok, Secret1};
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.
@ -116,7 +120,7 @@ force_add_app(AppId, Name, Secret, Desc, Status, Expired) ->
status = Status, status = Status,
expired = Expired}) expired = Expired})
end, end,
case mnesia:transaction(AddFun) of case ekka_mnesia:transaction(?MANAGEMENT_SHARD, AddFun) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.
@ -154,7 +158,8 @@ lookup_app(AppId) when is_binary(AppId) ->
update_app(AppId, Status) -> update_app(AppId, Status) ->
case mnesia:dirty_read(mqtt_app, AppId) of case mnesia:dirty_read(mqtt_app, AppId) of
[App = #mqtt_app{}] -> [App = #mqtt_app{}] ->
case mnesia:transaction(fun() -> mnesia:write(App#mqtt_app{status = Status}) end) of Fun = fun() -> mnesia:write(App#mqtt_app{status = Status}) end,
case ekka_mnesia:transaction(?MANAGEMENT_SHARD, Fun) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end; end;
@ -166,10 +171,12 @@ update_app(AppId, Status) ->
update_app(AppId, Name, Desc, Status, Expired) -> update_app(AppId, Name, Desc, Status, Expired) ->
case mnesia:dirty_read(mqtt_app, AppId) of case mnesia:dirty_read(mqtt_app, AppId) of
[App = #mqtt_app{}] -> [App = #mqtt_app{}] ->
case mnesia:transaction(fun() -> mnesia:write(App#mqtt_app{name = Name, case ekka_mnesia:transaction(
desc = Desc, ?MANAGEMENT_SHARD,
status = Status, fun() -> mnesia:write(App#mqtt_app{name = Name,
expired = Expired}) end) of desc = Desc,
status = Status,
expired = Expired}) end) of
{atomic, ok} -> ok; {atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end; end;
@ -179,7 +186,7 @@ update_app(AppId, Name, Desc, Status, Expired) ->
-spec(del_app(appid()) -> ok | {error, term()}). -spec(del_app(appid()) -> ok | {error, term()}).
del_app(AppId) when is_binary(AppId) -> del_app(AppId) when is_binary(AppId) ->
case mnesia:transaction(fun mnesia:delete/1, [{mqtt_app, AppId}]) of case ekka_mnesia:transaction(?MANAGEMENT_SHARD, fun mnesia:delete/1, [{mqtt_app, AppId}]) of
{atomic, Ok} -> Ok; {atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason} {aborted, Reason} -> {error, Reason}
end. end.
@ -207,4 +214,3 @@ is_authorized(AppId, AppSecret) ->
is_expired(undefined) -> true; is_expired(undefined) -> true;
is_expired(Expired) -> Expired >= erlang:system_time(second). is_expired(Expired) -> Expired >= erlang:system_time(second).

View File

@ -40,6 +40,7 @@ cases() ->
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard, emqx_auth_mnesia]), emqx_ct_helpers:start_apps([emqx_management, emqx_dashboard, emqx_auth_mnesia]),
application:set_env(ekka, strict_mode, true),
ekka_mnesia:start(), ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot), emqx_mgmt_auth:mnesia(boot),
Config. Config.
@ -172,4 +173,4 @@ test_import(clientid, {ClientID, Password}) ->
Req = #{clientid => ClientID, Req = #{clientid => ClientID,
password => Password}, password => Password},
?assertMatch({stop, #{auth_result := success}}, ?assertMatch({stop, #{auth_result := success}},
emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})). emqx_auth_mnesia:check(Req, #{}, #{hash_type => sha256})).

View File

@ -56,6 +56,7 @@ apps() ->
[emqx_management, emqx_auth_mnesia, emqx_modules]. [emqx_management, emqx_auth_mnesia, emqx_modules].
init_per_suite(Config) -> init_per_suite(Config) ->
application:set_env(ekka, strict_mode, true),
ekka_mnesia:start(), ekka_mnesia:start(),
emqx_mgmt_auth:mnesia(boot), emqx_mgmt_auth:mnesia(boot),
emqx_ct_helpers:start_apps(apps()), emqx_ct_helpers:start_apps(apps()),

View File

@ -1,3 +1,4 @@
%% -*- mode:erlang -*-
%% This config file is the very basic config to compile emqx %% This config file is the very basic config to compile emqx
%% This allows emqx to be used as a dependency for other applications %% This allows emqx to be used as a dependency for other applications
%% such as emqx module/plugin develpments and tests. %% such as emqx module/plugin develpments and tests.
@ -39,7 +40,7 @@
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.1"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.1"}}} % TODO: delete when all apps moved to hocon
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}