fix issue #449 - Improve the design of cluster

This commit is contained in:
Feng 2016-02-11 01:42:09 +08:00
parent add9f0619e
commit 8aad88e825
4 changed files with 356 additions and 97 deletions

63
src/emqttd_boot.erl Normal file
View File

@ -0,0 +1,63 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_boot).
-export([apply_module_attributes/1, all_module_attributes/1]).
%% only {F, Args}...
apply_module_attributes(Name) ->
[{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} ||
{_App, Module, Attrs} <- all_module_attributes(Name)].
%% Copy from rabbit_misc.erl
all_module_attributes(Name) ->
Targets =
lists:usort(
lists:append(
[[{App, Module} || Module <- Modules] ||
{App, _, _} <- ignore_lib_apps(application:loaded_applications()),
{ok, Modules} <- [application:get_key(App, modules)]])),
lists:foldl(
fun ({App, Module}, Acc) ->
case lists:append([Atts || {N, Atts} <- module_attributes(Module),
N =:= Name]) of
[] -> Acc;
Atts -> [{App, Module, Atts} | Acc]
end
end, [], Targets).
%% Copy from rabbit_misc.erl
module_attributes(Module) ->
case catch Module:module_info(attributes) of
{'EXIT', {undef, [{Module, module_info, _} | _]}} ->
[];
{'EXIT', Reason} ->
exit(Reason);
V ->
V
end.
ignore_lib_apps(Apps) ->
LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
syntax_tools, ssl, crypto, mnesia, os_mon,
inets, goldrush, lager, gproc, runtime_tools,
snmp, otp_mibs, public_key, asn1, ssh, hipe,
common_test, observer, webtool, xmerl, tools,
test_server, compiler, debugger, eunit, et,
gen_logger, wx],
[App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)].

87
src/emqttd_cluster.erl Normal file
View File

@ -0,0 +1,87 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2012-2016 Feng Lee <feng@emqtt.io>.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_cluster).
-include("emqttd.hrl").
%% Cluster API
-export([join/1, leave/0, status/0, remove/1]).
%% RPC Call
-export([prepare/0, reboot/0]).
%% @doc Join cluster
-spec join(node()) -> ok | {error, any()}.
join(Node) when Node =:= node() ->
{error, {cannot_join_with_self, Node}};
join(Node) when is_atom(Node) ->
case {is_clustered(Node), emqttd:is_running(Node)} of
{false, true} ->
prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot();
{false, false} ->
{error, {node_not_running, Node}};
{true, _} ->
{error, {already_clustered, Node}}
end.
%% @doc Prepare to join or leave cluster.
-spec prepare() -> ok.
prepare() ->
emqttd_plugins:unload(),
lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]).
%% @doc Is node in cluster?
-spec is_clustered(node()) -> boolean().
is_clustered(Node) ->
lists:member(Node, emqttd_mnesia:running_nodes()).
%% @doc Reboot after join or leave cluster.
-spec reboot() -> ok.
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]).
%% @doc Leave from Cluster.
-spec leave() -> ok | {error, any()}.
leave() ->
case emqttd_mnesia:running_nodes() -- [node()] of
[_|_] ->
prepare(), ok = emqttd_mnesia:leave_cluster(), reboot();
[] ->
{error, node_not_in_cluster}
end.
%% @doc Remove a node from cluster.
-spec remove(node()) -> ok | {error, any()}.
remove(Node) when Node =:= node() ->
{error, {cannot_remove_self, Node}};
remove(Node) ->
case rpc:call(Node, ?MODULE, prepare, []) of
ok ->
case emqttd_mnesia:remove_from_cluster(Node) of
ok -> rpc:call(Node, ?MODULE, reboot, []);
Error -> Error
end;
Error ->
{error, Error}
end.
%% @doc Cluster status
status() ->
emqttd_mnesia:cluster_status().

View File

@ -14,134 +14,243 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% TODO: refactor this module
%% @doc emqttd mnesia
%% @author Feng Lee <feng@emqtt.io>
-module(emqttd_mnesia).
-include("emqttd.hrl").
-export([start/0, cluster/1]).
-include("emqttd_internal.hrl").
-export([create_table/2, copy_table/1]).
%% Start and stop mnesia
-export([start/0, ensure_started/0, ensure_stopped/0, connect/1]).
-export([dump/3]).
%% Cluster mnesia
-export([join_cluster/1, cluster_status/0, leave_cluster/0,
remove_from_cluster/1, running_nodes/0]).
%% Schema and tables
-export([copy_schema/1, delete_schema/0, del_schema_copy/1,
create_table/2, copy_table/1, copy_table/2]).
%%--------------------------------------------------------------------
%% Start and init mnesia
%%--------------------------------------------------------------------
%% @doc Start mnesia database.
-spec start() -> ok.
start() ->
case init_schema() of
ok ->
ok;
{error, {_Node, {already_exists, _Node}}} ->
ok;
{error, Reason} ->
lager:error("mnesia init_schema error: ~p", [Reason])
end,
ensure_ok(ensure_data_dir()),
ensure_ok(init_schema()),
ok = mnesia:start(),
init_tables(),
wait_for_tables().
wait_for(tables).
%% @private
%% @doc Init mnesia schema.
ensure_data_dir() ->
Dir = mnesia:system_info(directory) ++ "/",
case filelib:ensure_dir(Dir) of
ok -> ok;
{error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}}
end.
%% @doc ensure mnesia started.
-spec ensure_started() -> ok | {error, any()}.
ensure_started() ->
ok = mnesia:start(), wait_for(start).
%% @doc ensure mnesia stopped.
-spec ensure_stopped() -> ok | {error, any()}.
ensure_stopped() ->
stopped = mnesia:stop(), wait_for(stop).
%% @private
%% @doc Init mnesia schema or tables.
init_schema() ->
case mnesia:system_info(extra_db_nodes) of
[] ->
%% create schema
mnesia:create_schema([node()]);
__ ->
ok
[] -> mnesia:create_schema([node()]);
[_|_] -> ok
end.
%% @private
%% @doc Init mnesia tables.
init_tables() ->
case mnesia:system_info(extra_db_nodes) of
[] ->
create_tables();
_ ->
copy_tables()
[] -> create_tables();
[_|_] -> copy_tables()
end.
%% @private
%% @doc create tables.
%% @doc Create mnesia tables.
create_tables() ->
emqttd_util:apply_module_attributes(boot_mnesia).
emqttd_boot:apply_module_attributes(boot_mnesia).
create_table(Table, Attrs) ->
case mnesia:create_table(Table, Attrs) of
{atomic, ok} -> ok;
{aborted, {already_exists, Table}} -> ok;
{aborted, {already_exists, Table, _}} -> ok;
Error -> Error
end.
%% @private
%% @doc copy tables.
%% @doc Copy mnesia tables.
copy_tables() ->
emqttd_util:apply_module_attributes(copy_mnesia).
emqttd_boot:apply_module_attributes(copy_mnesia).
copy_table(Table) ->
case mnesia:add_table_copy(Table, node(), ram_copies) of
{atomic, ok} -> ok;
{aborted, {already_exists, Table}} -> ok;
{aborted, {already_exists, Table, _Node}} -> ok;
{aborted, Error} -> Error
end.
%% @doc Create mnesia table.
-spec create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}.
create_table(Name, TabDef) ->
ensure_tab(mnesia:create_table(Name, TabDef)).
%% @private
%% @doc wait for tables.
wait_for_tables() ->
%% io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]),
mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity).
%% @doc Copy mnesia table.
-spec copy_table(Name :: atom()) -> ok.
copy_table(Name) ->
copy_table(Name, ram_copies).
%% TODO: should move to cluster.
%% @private
%% @doc Simple cluster with another nodes.
cluster(Node) ->
%% stop mnesia
mnesia:stop(),
ok = wait_for_mnesia(stop),
%% delete mnesia
ok = mnesia:delete_schema([node()]),
%% start mnesia
ok = mnesia:start(),
%% connect with extra_db_nodes
case mnesia:change_config(extra_db_nodes, [Node]) of
{ok, []} ->
throw({error, failed_to_connect_extra_db_nodes});
{ok, Nodes} ->
case lists:member(Node, Nodes) of
true -> lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]);
false -> lager:error("mnesia failed to connect extra_db_node '~s'!", [Node])
end,
mnesia:change_table_copy_type(schema, node(), disc_copies)
end,
copy_tables(),
wait_for_tables().
wait_for_mnesia(stop) ->
case mnesia:system_info(is_running) of
no ->
-spec copy_table(Name:: atom(), ram_copies | disc_copies) -> ok.
copy_table(Name, RamOrDisc) ->
ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)).
%% @doc Copy schema.
copy_schema(Node) ->
case mnesia:change_table_copy_type(schema, Node, disc_copies) of
{atomic, ok} ->
ok;
stopping ->
lager:info("Waiting for mnesia to stop..."),
timer:sleep(1000),
wait_for_mnesia(stop);
yes ->
{error, mnesia_unexpectedly_running};
starting ->
{error, mnesia_unexpectedly_starting}
{aborted, {already_exists, schema, Node, disc_copies}} ->
ok;
{aborted, Error} ->
{error, Error}
end.
dump(ets, Table, Fun) ->
dump(ets, Table, ets:first(Table), Fun).
%% @doc Force to delete schema.
delete_schema() ->
mnesia:delete_schema([node()]).
dump(ets, _Table, '$end_of_table', _Fun) ->
ok;
%% @doc Delete schema copy
del_schema_copy(Node) ->
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end.
dump(ets, Table, Key, Fun) ->
case ets:lookup(Table, Key) of
[Record] -> Fun(Record);
[] -> ignore
end,
dump(ets, Table, ets:next(Table, Key), Fun).
%%--------------------------------------------------------------------
%% Cluster mnesia
%%--------------------------------------------------------------------
%% @doc Join the mnesia cluster
-spec join_cluster(node()) -> ok.
join_cluster(Node) when Node =/= node() ->
%% Stop mnesia and delete schema first
ensure_ok(ensure_stopped()),
ensure_ok(delete_schema()),
%% Start mnesia and cluster to node
ensure_ok(ensure_started()),
ensure_ok(connect(Node)),
ensure_ok(copy_schema(node())),
%% Copy tables
copy_tables(),
ensure_ok(wait_for(tables)).
%% @doc Cluster status
-spec cluster_status() -> list().
cluster_status() ->
Running = mnesia:system_info(running_db_nodes),
Stopped = mnesia:system_info(db_nodes) -- Running,
?IF(Stopped =:= [], [{running_nodes, Running}],
[{running_nodes, Running}, {stopped_nodes, Stopped}]).
%% @doc This node try leave the cluster
-spec leave_cluster() -> ok | {error, any()}.
leave_cluster() ->
case running_nodes() -- [node()] of
[] ->
{error, node_not_in_cluster};
Nodes ->
case lists:any(fun(Node) ->
case leave_cluster(Node) of
ok -> true;
{error, _Reason} -> false
end
end, Nodes) of
true -> ok;
false -> {error, {failed_to_leave, Nodes}}
end
end.
-spec leave_cluster(node()) -> ok | {error, any()}.
leave_cluster(Node) when Node =/= node() ->
case is_running_db_node(Node) of
true ->
ensure_ok(ensure_stopped()),
ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])),
ensure_ok(delete_schema());
%%ensure_ok(start()); %% restart?
false ->
{error, {node_not_running, Node}}
end.
%% @doc Remove node from mnesia cluster.
-spec remove_from_cluster(node()) -> ok | {error, any()}.
remove_from_cluster(Node) when Node =/= node() ->
case {is_node_in_cluster(Node), is_running_db_node(Node)} of
{true, true} ->
ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])),
ensure_ok(del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
{true, false} ->
ensure_ok(zenmq_mnesia:del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [Node]));
{false, _} ->
{error, node_not_in_cluster}
end.
%% @doc Is node in mnesia cluster.
is_node_in_cluster(Node) ->
lists:member(Node, mnesia:system_info(db_nodes)).
%% @private
%% @doc Is running db node.
is_running_db_node(Node) ->
lists:member(Node, running_nodes()).
%% @doc Cluster with node.
-spec connect(node()) -> ok | {error, any()}.
connect(Node) ->
case mnesia:change_config(extra_db_nodes, [Node]) of
{ok, [Node]} -> ok;
{ok, []} -> {error, {failed_to_connect_node, Node}};
Error -> Error
end.
%% @doc Running nodes
-spec running_nodes() -> list(node()).
running_nodes() ->
mnesia:system_info(running_db_nodes).
%% @private
ensure_ok(ok) -> ok;
ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok;
ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}});
ensure_ok({error, Reason}) -> throw({error, Reason}).
%% @private
ensure_tab({atomic, ok}) -> ok;
ensure_tab({aborted, {already_exists, _Name}}) -> ok;
ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok;
ensure_tab({aborted, Error}) -> Error.
%% @doc Wait for mnesia to start, stop or copy tables.
-spec wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}.
wait_for(start) ->
case mnesia:system_info(is_running) of
yes -> ok;
no -> {error, mnesia_unexpectedly_stopped};
stopping -> {error, mnesia_unexpectedly_stopping};
starting -> timer:sleep(500), wait_for(start)
end;
wait_for(stop) ->
case mnesia:system_info(is_running) of
no -> ok;
yes -> {error, mnesia_unexpectedly_running};
starting -> {error, mnesia_unexpectedly_starting};
stopping -> timer:sleep(500), wait_for(stop)
end;
wait_for(tables) ->
Tables = mnesia:system_info(local_tables),
case mnesia:wait_for_tables(Tables, 600000) of
ok -> ok;
{error, Reason} -> {error, Reason};
{timeout, BadTables} -> {error, {timetout, BadTables}}
end.