diff --git a/src/emqttd_boot.erl b/src/emqttd_boot.erl new file mode 100644 index 000000000..75302365e --- /dev/null +++ b/src/emqttd_boot.erl @@ -0,0 +1,63 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_boot). + +-export([apply_module_attributes/1, all_module_attributes/1]). + +%% only {F, Args}... +apply_module_attributes(Name) -> + [{Module, [apply(Module, F, Args) || {F, Args} <- Attrs]} || + {_App, Module, Attrs} <- all_module_attributes(Name)]. + +%% Copy from rabbit_misc.erl +all_module_attributes(Name) -> + Targets = + lists:usort( + lists:append( + [[{App, Module} || Module <- Modules] || + {App, _, _} <- ignore_lib_apps(application:loaded_applications()), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun ({App, Module}, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{App, Module, Atts} | Acc] + end + end, [], Targets). + +%% Copy from rabbit_misc.erl +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + +ignore_lib_apps(Apps) -> + LibApps = [kernel, stdlib, sasl, appmon, eldap, erts, + syntax_tools, ssl, crypto, mnesia, os_mon, + inets, goldrush, lager, gproc, runtime_tools, + snmp, otp_mibs, public_key, asn1, ssh, hipe, + common_test, observer, webtool, xmerl, tools, + test_server, compiler, debugger, eunit, et, + gen_logger, wx], + [App || App = {Name, _, _} <- Apps, not lists:member(Name, LibApps)]. + diff --git a/src/emqttd_cluster.erl b/src/emqttd_cluster.erl new file mode 100644 index 000000000..3511792db --- /dev/null +++ b/src/emqttd_cluster.erl @@ -0,0 +1,87 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2012-2016 Feng Lee . +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqttd_cluster). + +-include("emqttd.hrl"). + +%% Cluster API +-export([join/1, leave/0, status/0, remove/1]). + +%% RPC Call +-export([prepare/0, reboot/0]). + +%% @doc Join cluster +-spec join(node()) -> ok | {error, any()}. +join(Node) when Node =:= node() -> + {error, {cannot_join_with_self, Node}}; + +join(Node) when is_atom(Node) -> + case {is_clustered(Node), emqttd:is_running(Node)} of + {false, true} -> + prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot(); + {false, false} -> + {error, {node_not_running, Node}}; + {true, _} -> + {error, {already_clustered, Node}} + end. + +%% @doc Prepare to join or leave cluster. +-spec prepare() -> ok. +prepare() -> + emqttd_plugins:unload(), + lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]). + +%% @doc Is node in cluster? +-spec is_clustered(node()) -> boolean(). +is_clustered(Node) -> + lists:member(Node, emqttd_mnesia:running_nodes()). + +%% @doc Reboot after join or leave cluster. +-spec reboot() -> ok. +reboot() -> + lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]). + +%% @doc Leave from Cluster. +-spec leave() -> ok | {error, any()}. +leave() -> + case emqttd_mnesia:running_nodes() -- [node()] of + [_|_] -> + prepare(), ok = emqttd_mnesia:leave_cluster(), reboot(); + [] -> + {error, node_not_in_cluster} + end. + +%% @doc Remove a node from cluster. +-spec remove(node()) -> ok | {error, any()}. +remove(Node) when Node =:= node() -> + {error, {cannot_remove_self, Node}}; + +remove(Node) -> + case rpc:call(Node, ?MODULE, prepare, []) of + ok -> + case emqttd_mnesia:remove_from_cluster(Node) of + ok -> rpc:call(Node, ?MODULE, reboot, []); + Error -> Error + end; + Error -> + {error, Error} + end. + +%% @doc Cluster status +status() -> + emqttd_mnesia:cluster_status(). + diff --git a/src/emqttd_mnesia.erl b/src/emqttd_mnesia.erl index e2813a508..9f62e6a74 100644 --- a/src/emqttd_mnesia.erl +++ b/src/emqttd_mnesia.erl @@ -14,134 +14,243 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% TODO: refactor this module -%% @doc emqttd mnesia -%% @author Feng Lee -module(emqttd_mnesia). -include("emqttd.hrl"). --export([start/0, cluster/1]). +-include("emqttd_internal.hrl"). --export([create_table/2, copy_table/1]). +%% Start and stop mnesia +-export([start/0, ensure_started/0, ensure_stopped/0, connect/1]). --export([dump/3]). +%% Cluster mnesia +-export([join_cluster/1, cluster_status/0, leave_cluster/0, + remove_from_cluster/1, running_nodes/0]). +%% Schema and tables +-export([copy_schema/1, delete_schema/0, del_schema_copy/1, + create_table/2, copy_table/1, copy_table/2]). + +%%-------------------------------------------------------------------- +%% Start and init mnesia +%%-------------------------------------------------------------------- + +%% @doc Start mnesia database. +-spec start() -> ok. start() -> - case init_schema() of - ok -> - ok; - {error, {_Node, {already_exists, _Node}}} -> - ok; - {error, Reason} -> - lager:error("mnesia init_schema error: ~p", [Reason]) - end, + ensure_ok(ensure_data_dir()), + ensure_ok(init_schema()), ok = mnesia:start(), init_tables(), - wait_for_tables(). + wait_for(tables). %% @private -%% @doc Init mnesia schema. +ensure_data_dir() -> + Dir = mnesia:system_info(directory) ++ "/", + case filelib:ensure_dir(Dir) of + ok -> ok; + {error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}} + end. + +%% @doc ensure mnesia started. +-spec ensure_started() -> ok | {error, any()}. +ensure_started() -> + ok = mnesia:start(), wait_for(start). + +%% @doc ensure mnesia stopped. +-spec ensure_stopped() -> ok | {error, any()}. +ensure_stopped() -> + stopped = mnesia:stop(), wait_for(stop). + +%% @private +%% @doc Init mnesia schema or tables. init_schema() -> case mnesia:system_info(extra_db_nodes) of - [] -> - %% create schema - mnesia:create_schema([node()]); - __ -> - ok + [] -> mnesia:create_schema([node()]); + [_|_] -> ok end. %% @private %% @doc Init mnesia tables. init_tables() -> case mnesia:system_info(extra_db_nodes) of - [] -> - create_tables(); - _ -> - copy_tables() + [] -> create_tables(); + [_|_] -> copy_tables() end. -%% @private -%% @doc create tables. +%% @doc Create mnesia tables. create_tables() -> - emqttd_util:apply_module_attributes(boot_mnesia). + emqttd_boot:apply_module_attributes(boot_mnesia). -create_table(Table, Attrs) -> - case mnesia:create_table(Table, Attrs) of - {atomic, ok} -> ok; - {aborted, {already_exists, Table}} -> ok; - {aborted, {already_exists, Table, _}} -> ok; - Error -> Error - end. - -%% @private -%% @doc copy tables. +%% @doc Copy mnesia tables. copy_tables() -> - emqttd_util:apply_module_attributes(copy_mnesia). + emqttd_boot:apply_module_attributes(copy_mnesia). -copy_table(Table) -> - case mnesia:add_table_copy(Table, node(), ram_copies) of - {atomic, ok} -> ok; - {aborted, {already_exists, Table}} -> ok; - {aborted, {already_exists, Table, _Node}} -> ok; - {aborted, Error} -> Error - end. +%% @doc Create mnesia table. +-spec create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}. +create_table(Name, TabDef) -> + ensure_tab(mnesia:create_table(Name, TabDef)). -%% @private -%% @doc wait for tables. -wait_for_tables() -> - %% io:format("mnesia wait_for_tables: ~p~n", [mnesia:system_info(local_tables)]), - mnesia:wait_for_tables(mnesia:system_info(local_tables), infinity). +%% @doc Copy mnesia table. +-spec copy_table(Name :: atom()) -> ok. +copy_table(Name) -> + copy_table(Name, ram_copies). -%% TODO: should move to cluster. -%% @private -%% @doc Simple cluster with another nodes. -cluster(Node) -> - %% stop mnesia - mnesia:stop(), - ok = wait_for_mnesia(stop), - %% delete mnesia - ok = mnesia:delete_schema([node()]), - %% start mnesia - ok = mnesia:start(), - %% connect with extra_db_nodes - case mnesia:change_config(extra_db_nodes, [Node]) of - {ok, []} -> - throw({error, failed_to_connect_extra_db_nodes}); - {ok, Nodes} -> - case lists:member(Node, Nodes) of - true -> lager:info("mnesia connected to extra_db_node '~s' successfully!", [Node]); - false -> lager:error("mnesia failed to connect extra_db_node '~s'!", [Node]) - end, - mnesia:change_table_copy_type(schema, node(), disc_copies) - end, - copy_tables(), - wait_for_tables(). - -wait_for_mnesia(stop) -> - case mnesia:system_info(is_running) of - no -> +-spec copy_table(Name:: atom(), ram_copies | disc_copies) -> ok. +copy_table(Name, RamOrDisc) -> + ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)). + +%% @doc Copy schema. +copy_schema(Node) -> + case mnesia:change_table_copy_type(schema, Node, disc_copies) of + {atomic, ok} -> ok; - stopping -> - lager:info("Waiting for mnesia to stop..."), - timer:sleep(1000), - wait_for_mnesia(stop); - yes -> - {error, mnesia_unexpectedly_running}; - starting -> - {error, mnesia_unexpectedly_starting} + {aborted, {already_exists, schema, Node, disc_copies}} -> + ok; + {aborted, Error} -> + {error, Error} end. -dump(ets, Table, Fun) -> - dump(ets, Table, ets:first(Table), Fun). +%% @doc Force to delete schema. +delete_schema() -> + mnesia:delete_schema([node()]). -dump(ets, _Table, '$end_of_table', _Fun) -> - ok; +%% @doc Delete schema copy +del_schema_copy(Node) -> + case mnesia:del_table_copy(schema, Node) of + {atomic, ok} -> ok; + {aborted, Reason} -> {error, Reason} + end. -dump(ets, Table, Key, Fun) -> - case ets:lookup(Table, Key) of - [Record] -> Fun(Record); - [] -> ignore - end, - dump(ets, Table, ets:next(Table, Key), Fun). +%%-------------------------------------------------------------------- +%% Cluster mnesia +%%-------------------------------------------------------------------- + +%% @doc Join the mnesia cluster +-spec join_cluster(node()) -> ok. +join_cluster(Node) when Node =/= node() -> + %% Stop mnesia and delete schema first + ensure_ok(ensure_stopped()), + ensure_ok(delete_schema()), + %% Start mnesia and cluster to node + ensure_ok(ensure_started()), + ensure_ok(connect(Node)), + ensure_ok(copy_schema(node())), + %% Copy tables + copy_tables(), + ensure_ok(wait_for(tables)). + +%% @doc Cluster status +-spec cluster_status() -> list(). +cluster_status() -> + Running = mnesia:system_info(running_db_nodes), + Stopped = mnesia:system_info(db_nodes) -- Running, + ?IF(Stopped =:= [], [{running_nodes, Running}], + [{running_nodes, Running}, {stopped_nodes, Stopped}]). + +%% @doc This node try leave the cluster +-spec leave_cluster() -> ok | {error, any()}. +leave_cluster() -> + case running_nodes() -- [node()] of + [] -> + {error, node_not_in_cluster}; + Nodes -> + case lists:any(fun(Node) -> + case leave_cluster(Node) of + ok -> true; + {error, _Reason} -> false + end + end, Nodes) of + true -> ok; + false -> {error, {failed_to_leave, Nodes}} + end + end. + +-spec leave_cluster(node()) -> ok | {error, any()}. +leave_cluster(Node) when Node =/= node() -> + case is_running_db_node(Node) of + true -> + ensure_ok(ensure_stopped()), + ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])), + ensure_ok(delete_schema()); + %%ensure_ok(start()); %% restart? + false -> + {error, {node_not_running, Node}} + end. + +%% @doc Remove node from mnesia cluster. +-spec remove_from_cluster(node()) -> ok | {error, any()}. +remove_from_cluster(Node) when Node =/= node() -> + case {is_node_in_cluster(Node), is_running_db_node(Node)} of + {true, true} -> + ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])), + ensure_ok(del_schema_copy(Node)), + ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [])); + {true, false} -> + ensure_ok(zenmq_mnesia:del_schema_copy(Node)), + ensure_ok(rpc:call(Node, ?MODULE, delete_schema, [Node])); + {false, _} -> + {error, node_not_in_cluster} + end. + +%% @doc Is node in mnesia cluster. +is_node_in_cluster(Node) -> + lists:member(Node, mnesia:system_info(db_nodes)). + +%% @private +%% @doc Is running db node. +is_running_db_node(Node) -> + lists:member(Node, running_nodes()). + +%% @doc Cluster with node. +-spec connect(node()) -> ok | {error, any()}. +connect(Node) -> + case mnesia:change_config(extra_db_nodes, [Node]) of + {ok, [Node]} -> ok; + {ok, []} -> {error, {failed_to_connect_node, Node}}; + Error -> Error + end. + +%% @doc Running nodes +-spec running_nodes() -> list(node()). +running_nodes() -> + mnesia:system_info(running_db_nodes). + +%% @private +ensure_ok(ok) -> ok; +ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok; +ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}}); +ensure_ok({error, Reason}) -> throw({error, Reason}). + +%% @private +ensure_tab({atomic, ok}) -> ok; +ensure_tab({aborted, {already_exists, _Name}}) -> ok; +ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok; +ensure_tab({aborted, Error}) -> Error. + +%% @doc Wait for mnesia to start, stop or copy tables. +-spec wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}. +wait_for(start) -> + case mnesia:system_info(is_running) of + yes -> ok; + no -> {error, mnesia_unexpectedly_stopped}; + stopping -> {error, mnesia_unexpectedly_stopping}; + starting -> timer:sleep(500), wait_for(start) + end; + +wait_for(stop) -> + case mnesia:system_info(is_running) of + no -> ok; + yes -> {error, mnesia_unexpectedly_running}; + starting -> {error, mnesia_unexpectedly_starting}; + stopping -> timer:sleep(500), wait_for(stop) + end; + +wait_for(tables) -> + Tables = mnesia:system_info(local_tables), + case mnesia:wait_for_tables(Tables, 600000) of + ok -> ok; + {error, Reason} -> {error, Reason}; + {timeout, BadTables} -> {error, {timetout, BadTables}} + end. diff --git a/src/emqttd_dist.erl b/src/emqttd_node.erl similarity index 100% rename from src/emqttd_dist.erl rename to src/emqttd_node.erl