Integrage with 'ekka' library

This commit is contained in:
Feng Lee 2017-06-16 12:30:09 +08:00
parent 44565a3de1
commit 9b646d8cbc
12 changed files with 53 additions and 429 deletions

View File

@ -2,13 +2,14 @@ PROJECT = emqttd
PROJECT_DESCRIPTION = Erlang MQTT Broker
PROJECT_VERSION = 2.3
DEPS = goldrush gproc lager esockd mochiweb pbkdf2 lager_syslog bcrypt
DEPS = goldrush gproc lager esockd ekka mochiweb pbkdf2 lager_syslog bcrypt
dep_goldrush = git https://github.com/basho/goldrush 0.1.9
dep_gproc = git https://github.com/uwiger/gproc
dep_getopt = git https://github.com/jcomellas/getopt v0.8.2
dep_lager = git https://github.com/basho/lager master
dep_esockd = git https://github.com/emqtt/esockd master
dep_ekka = git https://github.com/emqtt/ekka master
dep_mochiweb = git https://github.com/emqtt/mochiweb master
dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1
dep_lager_syslog = git https://github.com/basho/lager_syslog

View File

@ -1,4 +1,4 @@
{deps, [
{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","emq22"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","emq22"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}}
{goldrush,".*",{git,"https://github.com/basho/goldrush","0.1.9"}},{gproc,".*",{git,"https://github.com/uwiger/gproc",""}},{lager,".*",{git,"https://github.com/basho/lager","master"}},{esockd,".*",{git,"https://github.com/emqtt/esockd","master"}},{ekka,".*",{git,"https://github.com/emqtt/ekka","master"}},{mochiweb,".*",{git,"https://github.com/emqtt/mochiweb","master"}},{pbkdf2,".*",{git,"https://github.com/emqtt/pbkdf2","2.0.1"}},{lager_syslog,".*",{git,"https://github.com/basho/lager_syslog",""}},{bcrypt,".*",{git,"https://github.com/smarkets/erlang-bcrypt","master"}}
]}.
{erl_opts, [{parse_transform,lager_transform}]}.
{erl_opts, [debug_info,{parse_transform,lager_transform}]}.

View File

@ -40,6 +40,9 @@
%% Debug API
-export([dump/0]).
%% Shutdown and reboot
-export([shutdown/0, shutdown/1, reboot/0]).
-type(subscriber() :: pid() | binary()).
-type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
@ -161,6 +164,21 @@ run_hooks(Hook, Args) ->
run_hooks(Hook, Args, Acc) ->
emqttd_hooks:run(Hook, Args, Acc).
%%--------------------------------------------------------------------
%% Shutdown and reboot
%%--------------------------------------------------------------------
shutdown() ->
shutdown(normal).
shutdown(Reason) ->
lager:error("EMQ shutdown for ~s", [Reason]),
emqttd_plugins:unload(),
lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]).
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]).
%%--------------------------------------------------------------------
%% Debug
%%--------------------------------------------------------------------

View File

@ -34,18 +34,19 @@
-define(APP, emqttd).
%%--------------------------------------------------------------------
%% Application callbacks
%% Application Callbacks
%%--------------------------------------------------------------------
start(_Type, _Args) ->
print_banner(),
emqttd_mnesia:start(),
ekka:start(),
{ok, Sup} = emqttd_sup:start_link(),
start_servers(Sup),
emqttd_cli:load(),
register_acl_mod(),
emqttd_plugins:init(),
emqttd_plugins:load(),
init_cluster(),
start_listeners(),
register(emqttd, self()),
print_vsn(),
@ -146,6 +147,14 @@ register_acl_mod() ->
undefined -> ok
end.
%%--------------------------------------------------------------------
%% Init Cluster
%%--------------------------------------------------------------------
init_cluster() ->
ekka:callback(prepare, fun emqttd:shutdown/1),
ekka:callback(reboot, fun emqttd:reboot/1).
%%--------------------------------------------------------------------
%% Start Listeners
%%--------------------------------------------------------------------

View File

@ -163,7 +163,7 @@ code_change(_OldVsn, State, _Extra) ->
retain(brokers) ->
Payload = list_to_binary(string:join([atom_to_list(N) ||
N <- emqttd_mnesia:running_nodes()], ",")),
N <- ekka_mnesia:running_nodes()], ",")),
Msg = emqttd_message:make(broker, <<"$SYS/brokers">>, Payload),
emqttd:publish(emqttd_message:set_flag(sys, emqttd_message:set_flag(retain, Msg))).

View File

@ -111,7 +111,7 @@ broker(_) ->
%% @doc Cluster with other nodes
cluster(["join", SNode]) ->
case emqttd_cluster:join(emqttd_node:parse_name(SNode)) of
case ekka:join(ekka_node:parse_name(SNode)) of
ok ->
?PRINT_MSG("Join the cluster successfully.~n"),
cluster(["status"]);
@ -120,7 +120,7 @@ cluster(["join", SNode]) ->
end;
cluster(["leave"]) ->
case emqttd_cluster:leave() of
case ekka:leave() of
ok ->
?PRINT_MSG("Leave the cluster successfully.~n"),
cluster(["status"]);
@ -128,8 +128,8 @@ cluster(["leave"]) ->
?PRINT("Failed to leave the cluster: ~p~n", [Error])
end;
cluster(["remove", SNode]) ->
case emqttd_cluster:remove(emqttd_node:parse_name(SNode)) of
cluster(["force-leave", SNode]) ->
case ekka:force_leave(ekka_node:parse_name(SNode)) of
ok ->
?PRINT_MSG("Remove the node from cluster successfully.~n"),
cluster(["status"]);
@ -138,12 +138,12 @@ cluster(["remove", SNode]) ->
end;
cluster(["status"]) ->
?PRINT("Cluster status: ~p~n", [emqttd_cluster:status()]);
?PRINT("Cluster status: ~p~n", [ekka_cluster:status()]);
cluster(_) ->
?USAGE([{"cluster join <Node>", "Join the cluster"},
{"cluster leave", "Leave the cluster"},
{"cluster remove <Node>","Remove the node from cluster"},
{"cluster force-leave <Node>","Force the node leave from cluster"},
{"cluster status", "Cluster status"}]).
%%--------------------------------------------------------------------

View File

@ -1,92 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_cluster).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
%% Cluster API
-export([join/1, leave/0, status/0, remove/1]).
%% RPC Call
-export([prepare/0, reboot/0]).
%% @doc Join cluster
-spec(join(node()) -> ok | {error, any()}).
join(Node) when Node =:= node() ->
{error, {cannot_join_with_self, Node}};
join(Node) when is_atom(Node) ->
case {is_clustered(Node), emqttd:is_running(Node)} of
{false, true} ->
prepare(), ok = emqttd_mnesia:join_cluster(Node), reboot();
{false, false} ->
{error, {node_not_running, Node}};
{true, _} ->
{error, {already_clustered, Node}}
end.
%% @doc Prepare to join or leave cluster.
-spec(prepare() -> ok).
prepare() ->
emqttd_plugins:unload(),
lists:foreach(fun application:stop/1, [emqttd, mochiweb, esockd, gproc]).
%% @doc Is node in cluster?
-spec(is_clustered(node()) -> boolean()).
is_clustered(Node) ->
lists:member(Node, emqttd_mnesia:cluster_nodes(all)).
%% @doc Reboot after join or leave cluster.
-spec(reboot() -> ok).
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, emqttd]).
%% @doc Leave from Cluster.
-spec(leave() -> ok | {error, any()}).
leave() ->
case emqttd_mnesia:running_nodes() -- [node()] of
[_|_] ->
prepare(), ok = emqttd_mnesia:leave_cluster(), reboot();
[] ->
{error, node_not_in_cluster}
end.
%% @doc Remove a node from cluster.
-spec(remove(node()) -> ok | {error, any()}).
remove(Node) when Node =:= node() ->
{error, {cannot_remove_self, Node}};
remove(Node) ->
case is_clustered(Node) andalso rpc:call(Node, ?MODULE, prepare, []) of
ok ->
case emqttd_mnesia:remove_from_cluster(Node) of
ok -> rpc:call(Node, ?MODULE, reboot, []);
Error -> Error
end;
false ->
{error, node_not_in_cluster};
{badrpc, nodedown} ->
emqttd_mnesia:remove_from_cluster(Node);
{badrpc, Reason} ->
{error, Reason}
end.
%% @doc Cluster status
status() -> emqttd_mnesia:cluster_status().

View File

@ -1,268 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_mnesia).
-author("Feng Lee <feng@emqtt.io>").
-include("emqttd.hrl").
-include("emqttd_internal.hrl").
%% Start and stop mnesia
-export([start/0, ensure_started/0, ensure_stopped/0, connect/1]).
%% Cluster mnesia
-export([join_cluster/1, cluster_status/0, leave_cluster/0,
remove_from_cluster/1, cluster_nodes/1, running_nodes/0]).
%% Schema and tables
-export([copy_schema/1, delete_schema/0, del_schema_copy/1,
create_table/2, copy_table/1, copy_table/2]).
%%--------------------------------------------------------------------
%% Start and init mnesia
%%--------------------------------------------------------------------
%% @doc Start mnesia database.
-spec(start() -> ok).
start() ->
ensure_ok(ensure_data_dir()),
ensure_ok(init_schema()),
ok = mnesia:start(),
init_tables(),
wait_for(tables).
%% @private
ensure_data_dir() ->
Dir = mnesia:system_info(directory) ++ "/",
case filelib:ensure_dir(Dir) of
ok -> ok;
{error, Reason} -> {error, {mnesia_dir_error, Dir, Reason}}
end.
%% @doc ensure mnesia started.
-spec(ensure_started() -> ok | {error, any()}).
ensure_started() ->
ok = mnesia:start(), wait_for(start).
%% @doc ensure mnesia stopped.
-spec(ensure_stopped() -> ok | {error, any()}).
ensure_stopped() ->
stopped = mnesia:stop(), wait_for(stop).
%% @private
%% @doc Init mnesia schema or tables.
init_schema() ->
case mnesia:system_info(extra_db_nodes) of
[] -> mnesia:create_schema([node()]);
[_|_] -> ok
end.
%% @private
%% @doc Init mnesia tables.
init_tables() ->
case mnesia:system_info(extra_db_nodes) of
[] -> create_tables();
[_|_] -> copy_tables()
end.
%% @doc Create mnesia tables.
create_tables() ->
emqttd_boot:apply_module_attributes(boot_mnesia).
%% @doc Copy mnesia tables.
copy_tables() ->
emqttd_boot:apply_module_attributes(copy_mnesia).
%% @doc Create mnesia table.
-spec(create_table(Name:: atom(), TabDef :: list()) -> ok | {error, any()}).
create_table(Name, TabDef) ->
ensure_tab(mnesia:create_table(Name, TabDef)).
%% @doc Copy mnesia table.
-spec(copy_table(Name :: atom()) -> ok).
copy_table(Name) ->
copy_table(Name, ram_copies).
-spec(copy_table(Name:: atom(), ram_copies | disc_copies) -> ok).
copy_table(Name, RamOrDisc) ->
ensure_tab(mnesia:add_table_copy(Name, node(), RamOrDisc)).
%% @doc Copy schema.
copy_schema(Node) ->
case mnesia:change_table_copy_type(schema, Node, disc_copies) of
{atomic, ok} ->
ok;
{aborted, {already_exists, schema, Node, disc_copies}} ->
ok;
{aborted, Error} ->
{error, Error}
end.
%% @doc Force to delete schema.
delete_schema() ->
mnesia:delete_schema([node()]).
%% @doc Delete schema copy
del_schema_copy(Node) ->
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} -> ok;
{aborted, Reason} -> {error, Reason}
end.
%%--------------------------------------------------------------------
%% Cluster mnesia
%%--------------------------------------------------------------------
%% @doc Join the mnesia cluster
-spec(join_cluster(node()) -> ok).
join_cluster(Node) when Node =/= node() ->
%% Stop mnesia and delete schema first
ensure_ok(ensure_stopped()),
ensure_ok(delete_schema()),
%% Start mnesia and cluster to node
ensure_ok(ensure_started()),
ensure_ok(connect(Node)),
ensure_ok(copy_schema(node())),
%% Copy tables
copy_tables(),
ensure_ok(wait_for(tables)).
%% @doc Cluster status
-spec(cluster_status() -> list()).
cluster_status() ->
Running = mnesia:system_info(running_db_nodes),
Stopped = mnesia:system_info(db_nodes) -- Running,
?IF(Stopped =:= [], [{running_nodes, Running}],
[{running_nodes, Running}, {stopped_nodes, Stopped}]).
%% @doc This node try leave the cluster
-spec(leave_cluster() -> ok | {error, any()}).
leave_cluster() ->
case running_nodes() -- [node()] of
[] ->
{error, node_not_in_cluster};
Nodes ->
case lists:any(fun(Node) ->
case leave_cluster(Node) of
ok -> true;
{error, _Reason} -> false
end
end, Nodes) of
true -> ok;
false -> {error, {failed_to_leave, Nodes}}
end
end.
-spec(leave_cluster(node()) -> ok | {error, any()}).
leave_cluster(Node) when Node =/= node() ->
case is_running_db_node(Node) of
true ->
ensure_ok(ensure_stopped()),
ensure_ok(rpc:call(Node, ?MODULE, del_schema_copy, [node()])),
ensure_ok(delete_schema());
%%ensure_ok(start()); %% restart?
false ->
{error, {node_not_running, Node}}
end.
%% @doc Remove node from mnesia cluster.
-spec(remove_from_cluster(node()) -> ok | {error, any()}).
remove_from_cluster(Node) when Node =/= node() ->
case {is_node_in_cluster(Node), is_running_db_node(Node)} of
{true, true} ->
ensure_ok(rpc:call(Node, ?MODULE, ensure_stopped, [])),
mnesia_lib:del(extra_db_nodes, Node),
ensure_ok(del_schema_copy(Node)),
ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
{true, false} ->
mnesia_lib:del(extra_db_nodes, Node),
ensure_ok(del_schema_copy(Node));
%ensure_ok(rpc:call(Node, ?MODULE, delete_schema, []));
{false, _} ->
{error, node_not_in_cluster}
end.
%% @doc Is node in mnesia cluster.
is_node_in_cluster(Node) ->
lists:member(Node, mnesia:system_info(db_nodes)).
%% @private
%% @doc Is running db node.
is_running_db_node(Node) ->
lists:member(Node, running_nodes()).
%% @doc Cluster with node.
-spec(connect(node()) -> ok | {error, any()}).
connect(Node) ->
case mnesia:change_config(extra_db_nodes, [Node]) of
{ok, [Node]} -> ok;
{ok, []} -> {error, {failed_to_connect_node, Node}};
Error -> Error
end.
%% @doc Running nodes.
-spec(running_nodes() -> list(node())).
running_nodes() -> cluster_nodes(running).
%% @doc Cluster nodes.
-spec(cluster_nodes(all | running | stopped) -> [node()]).
cluster_nodes(all) ->
mnesia:system_info(db_nodes);
cluster_nodes(running) ->
mnesia:system_info(running_db_nodes);
cluster_nodes(stopped) ->
cluster_nodes(all) -- cluster_nodes(running).
%% @private
ensure_ok(ok) -> ok;
ensure_ok({error, {_Node, {already_exists, _Node}}}) -> ok;
ensure_ok({badrpc, Reason}) -> throw({error, {badrpc, Reason}});
ensure_ok({error, Reason}) -> throw({error, Reason}).
%% @private
ensure_tab({atomic, ok}) -> ok;
ensure_tab({aborted, {already_exists, _Name}}) -> ok;
ensure_tab({aborted, {already_exists, _Name, _Node}})-> ok;
ensure_tab({aborted, Error}) -> Error.
%% @doc Wait for mnesia to start, stop or tables ready.
-spec(wait_for(start | stop | tables) -> ok | {error, Reason :: atom()}).
wait_for(start) ->
case mnesia:system_info(is_running) of
yes -> ok;
no -> {error, mnesia_unexpectedly_stopped};
stopping -> {error, mnesia_unexpectedly_stopping};
starting -> timer:sleep(1000), wait_for(start)
end;
wait_for(stop) ->
case mnesia:system_info(is_running) of
no -> ok;
yes -> {error, mnesia_unexpectedly_running};
starting -> {error, mnesia_unexpectedly_starting};
stopping -> timer:sleep(1000), wait_for(stop)
end;
wait_for(tables) ->
Tables = mnesia:system_info(local_tables),
case mnesia:wait_for_tables(Tables, 600000) of
ok -> ok;
{error, Reason} -> {error, Reason};
{timeout, BadTables} -> {error, {timetout, BadTables}}
end.

View File

@ -1,44 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqttd_node).
-author("Feng Lee <feng@emqtt.io>").
-import(lists, [concat/1]).
-export([is_aliving/1, parse_name/1]).
%% @doc Is node aliving
-spec(is_aliving(node()) -> boolean()).
is_aliving(Node) ->
case net_adm:ping(Node) of
pong -> true;
pang -> false
end.
%% @doc Parse node name
-spec(parse_name(string()) -> atom()).
parse_name(Name) when is_list(Name) ->
case string:tokens(Name, "@") of
[_Node, _Host] -> list_to_atom(Name);
_ -> with_host(Name)
end.
with_host(Name) ->
[_, Host] = string:tokens(atom_to_list(node()), "@"),
list_to_atom(concat([Name, "@", Host])).

View File

@ -53,19 +53,19 @@
%%--------------------------------------------------------------------
mnesia(boot) ->
ok = emqttd_mnesia:create_table(mqtt_topic, [
ok = ekka_mnesia:create_table(mqtt_topic, [
{ram_copies, [node()]},
{record_name, mqtt_topic},
{attributes, record_info(fields, mqtt_topic)}]),
ok = emqttd_mnesia:create_table(mqtt_route, [
ok = ekka_mnesia:create_table(mqtt_route, [
{type, bag},
{ram_copies, [node()]},
{record_name, mqtt_route},
{attributes, record_info(fields, mqtt_route)}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(mqtt_topic),
ok = emqttd_mnesia:copy_table(mqtt_route, ram_copies).
ok = ekka_mnesia:copy_table(mqtt_topic),
ok = ekka_mnesia:copy_table(mqtt_route, ram_copies).
%%--------------------------------------------------------------------
%% Start the Router

View File

@ -62,14 +62,14 @@
mnesia(boot) ->
%% Global Session Table
ok = emqttd_mnesia:create_table(mqtt_session, [
ok = ekka_mnesia:create_table(mqtt_session, [
{type, set},
{ram_copies, [node()]},
{record_name, mqtt_session},
{attributes, record_info(fields, mqtt_session)}]);
mnesia(copy) ->
ok = emqttd_mnesia:copy_table(mqtt_session).
ok = ekka_mnesia:copy_table(mqtt_session).
%%--------------------------------------------------------------------
%% API

View File

@ -41,21 +41,21 @@
-spec(mnesia(boot | copy) -> ok).
mnesia(boot) ->
%% Trie Table
ok = emqttd_mnesia:create_table(mqtt_trie, [
ok = ekka_mnesia:create_table(mqtt_trie, [
{ram_copies, [node()]},
{record_name, trie},
{attributes, record_info(fields, trie)}]),
%% Trie Node Table
ok = emqttd_mnesia:create_table(mqtt_trie_node, [
ok = ekka_mnesia:create_table(mqtt_trie_node, [
{ram_copies, [node()]},
{record_name, trie_node},
{attributes, record_info(fields, trie_node)}]);
mnesia(copy) ->
%% Copy Trie Table
ok = emqttd_mnesia:copy_table(mqtt_trie),
ok = ekka_mnesia:copy_table(mqtt_trie),
%% Copy Trie Node Table
ok = emqttd_mnesia:copy_table(mqtt_trie_node).
ok = ekka_mnesia:copy_table(mqtt_trie_node).
%%--------------------------------------------------------------------
%% Trie API