Merge with emqx
This commit is contained in:
parent
845c5eddc1
commit
38c33e9c8b
1
Makefile
1
Makefile
|
@ -34,7 +34,6 @@ TEST_ERLC_OPTS += +debug_info
|
||||||
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
|
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
|
||||||
|
|
||||||
EUNIT_OPTS = verbose
|
EUNIT_OPTS = verbose
|
||||||
# EUNIT_ERL_OPTS =
|
|
||||||
|
|
||||||
CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \
|
CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \
|
||||||
emqx_vm emqx_net emqx_protocol emqx_access emqx_config emqx_router
|
emqx_vm emqx_net emqx_protocol emqx_access emqx_config emqx_router
|
||||||
|
|
181
src/emqttd.erl
181
src/emqttd.erl
|
@ -1,181 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io)
|
|
||||||
%%
|
|
||||||
%% Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
%% you may not use this file except in compliance with the License.
|
|
||||||
%% You may obtain a copy of the License at
|
|
||||||
%%
|
|
||||||
%% http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
%%
|
|
||||||
%% Unless required by applicable law or agreed to in writing, software
|
|
||||||
%% distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
%% See the License for the specific language governing permissions and
|
|
||||||
%% limitations under the License.
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc EMQ Main Module.
|
|
||||||
|
|
||||||
-module(emqttd).
|
|
||||||
|
|
||||||
-author("Feng Lee <feng@emqtt.io>").
|
|
||||||
|
|
||||||
-include("emqttd.hrl").
|
|
||||||
|
|
||||||
-include("emqttd_protocol.hrl").
|
|
||||||
|
|
||||||
-export([start/0, env/1, env/2, is_running/1, stop/0]).
|
|
||||||
|
|
||||||
%% PubSub API
|
|
||||||
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
|
|
||||||
unsubscribe/1, unsubscribe/2]).
|
|
||||||
|
|
||||||
%% PubSub Management API
|
|
||||||
-export([setqos/3, topics/0, subscriptions/1, subscribers/1, subscribed/2]).
|
|
||||||
|
|
||||||
%% Hooks API
|
|
||||||
-export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]).
|
|
||||||
|
|
||||||
%% Debug API
|
|
||||||
-export([dump/0]).
|
|
||||||
|
|
||||||
%% Shutdown and reboot
|
|
||||||
-export([shutdown/0, shutdown/1, reboot/0]).
|
|
||||||
|
|
||||||
-type(subid() :: binary()).
|
|
||||||
|
|
||||||
-type(subscriber() :: pid() | subid() | {subid(), pid()}).
|
|
||||||
|
|
||||||
-type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}).
|
|
||||||
|
|
||||||
-export_type([subscriber/0, suboption/0]).
|
|
||||||
|
|
||||||
-define(APP, ?MODULE).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Bootstrap, environment, configuration, is_running...
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Start emqttd application.
|
|
||||||
-spec(start() -> ok | {error, term()}).
|
|
||||||
start() -> application:start(?APP).
|
|
||||||
|
|
||||||
%% @doc Stop emqttd application.
|
|
||||||
-spec(stop() -> ok | {error, term()}).
|
|
||||||
stop() -> application:stop(?APP).
|
|
||||||
|
|
||||||
%% @doc Environment
|
|
||||||
-spec(env(Key :: atom()) -> {ok, any()} | undefined).
|
|
||||||
env(Key) -> application:get_env(?APP, Key).
|
|
||||||
|
|
||||||
%% @doc Get environment
|
|
||||||
-spec(env(Key :: atom(), Default :: any()) -> undefined | any()).
|
|
||||||
env(Key, Default) -> application:get_env(?APP, Key, Default).
|
|
||||||
|
|
||||||
%% @doc Is running?
|
|
||||||
-spec(is_running(node()) -> boolean()).
|
|
||||||
is_running(Node) ->
|
|
||||||
case rpc:call(Node, erlang, whereis, [?APP]) of
|
|
||||||
{badrpc, _} -> false;
|
|
||||||
undefined -> false;
|
|
||||||
Pid when is_pid(Pid) -> true
|
|
||||||
end.
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% PubSub APIs
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
%% @doc Subscribe
|
|
||||||
-spec(subscribe(iodata()) -> ok | {error, term()}).
|
|
||||||
subscribe(Topic) ->
|
|
||||||
emqttd_server:subscribe(iolist_to_binary(Topic)).
|
|
||||||
|
|
||||||
-spec(subscribe(iodata(), subscriber()) -> ok | {error, term()}).
|
|
||||||
subscribe(Topic, Subscriber) ->
|
|
||||||
emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber).
|
|
||||||
|
|
||||||
-spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | {error, term()}).
|
|
||||||
subscribe(Topic, Subscriber, Options) ->
|
|
||||||
emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options).
|
|
||||||
|
|
||||||
%% @doc Publish MQTT Message
|
|
||||||
-spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore).
|
|
||||||
publish(Msg) ->
|
|
||||||
emqttd_server:publish(Msg).
|
|
||||||
|
|
||||||
%% @doc Unsubscribe
|
|
||||||
-spec(unsubscribe(iodata()) -> ok | {error, term()}).
|
|
||||||
unsubscribe(Topic) ->
|
|
||||||
emqttd_server:unsubscribe(iolist_to_binary(Topic)).
|
|
||||||
|
|
||||||
-spec(unsubscribe(iodata(), subscriber()) -> ok | {error, term()}).
|
|
||||||
unsubscribe(Topic, Subscriber) ->
|
|
||||||
emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber).
|
|
||||||
|
|
||||||
-spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok).
|
|
||||||
setqos(Topic, Subscriber, Qos) ->
|
|
||||||
emqttd_server:setqos(iolist_to_binary(Topic), Subscriber, Qos).
|
|
||||||
|
|
||||||
-spec(topics() -> [binary()]).
|
|
||||||
topics() -> emqttd_router:topics().
|
|
||||||
|
|
||||||
-spec(subscribers(iodata()) -> list(subscriber())).
|
|
||||||
subscribers(Topic) ->
|
|
||||||
emqttd_server:subscribers(iolist_to_binary(Topic)).
|
|
||||||
|
|
||||||
-spec(subscriptions(subscriber()) -> [{emqttd:subscriber(), binary(), list(emqttd:suboption())}]).
|
|
||||||
subscriptions(Subscriber) ->
|
|
||||||
emqttd_server:subscriptions(Subscriber).
|
|
||||||
|
|
||||||
-spec(subscribed(iodata(), subscriber()) -> boolean()).
|
|
||||||
subscribed(Topic, Subscriber) ->
|
|
||||||
emqttd_server:subscribed(iolist_to_binary(Topic), Subscriber).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Hooks API
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()))
|
|
||||||
-> ok | {error, term()}).
|
|
||||||
hook(Hook, TagFunction, InitArgs) ->
|
|
||||||
emqttd_hooks:add(Hook, TagFunction, InitArgs).
|
|
||||||
|
|
||||||
-spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer())
|
|
||||||
-> ok | {error, term()}).
|
|
||||||
hook(Hook, TagFunction, InitArgs, Priority) ->
|
|
||||||
emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority).
|
|
||||||
|
|
||||||
-spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()})
|
|
||||||
-> ok | {error, term()}).
|
|
||||||
unhook(Hook, TagFunction) ->
|
|
||||||
emqttd_hooks:delete(Hook, TagFunction).
|
|
||||||
|
|
||||||
-spec(run_hooks(atom(), list(any())) -> ok | stop).
|
|
||||||
run_hooks(Hook, Args) ->
|
|
||||||
emqttd_hooks:run(Hook, Args).
|
|
||||||
|
|
||||||
-spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}).
|
|
||||||
run_hooks(Hook, Args, Acc) ->
|
|
||||||
emqttd_hooks:run(Hook, Args, Acc).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Shutdown and reboot
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
shutdown() ->
|
|
||||||
shutdown(normal).
|
|
||||||
|
|
||||||
shutdown(Reason) ->
|
|
||||||
lager:error("EMQ shutdown for ~s", [Reason]),
|
|
||||||
emqttd_plugins:unload(),
|
|
||||||
lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]).
|
|
||||||
|
|
||||||
reboot() ->
|
|
||||||
lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Debug
|
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
|
|
||||||
dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]).
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application,emqx,
|
{application,emqx,
|
||||||
[{description,"EMQ X Broker"},
|
[{description,"EMQ X Broker"},
|
||||||
{vsn,"2.4"},
|
{vsn,"2.3.0"},
|
||||||
{modules,[]},
|
{modules,[]},
|
||||||
{registered,[emqx_sup]},
|
{registered,[emqx_sup]},
|
||||||
{applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]},
|
{applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]},
|
||||||
|
@ -8,4 +8,4 @@
|
||||||
{mod,{emqx_app,[]}},
|
{mod,{emqx_app,[]}},
|
||||||
{maintainers,["Feng Lee <feng@emqtt.io>"]},
|
{maintainers,["Feng Lee <feng@emqtt.io>"]},
|
||||||
{licenses,["Apache-2.0"]},
|
{licenses,["Apache-2.0"]},
|
||||||
{links,[{"Github","https://github.com/emqtt/emqttd"}]}]}.
|
{links,[{"Github","https://github.com/emqx/emqx"}]}]}.
|
||||||
|
|
|
@ -249,8 +249,6 @@ init([]) ->
|
||||||
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
|
||||||
% Init metrics
|
% Init metrics
|
||||||
[create_metric(Metric) || Metric <- Metrics],
|
[create_metric(Metric) || Metric <- Metrics],
|
||||||
% $SYS Topics for metrics
|
|
||||||
% [ok = emqttd:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics],
|
|
||||||
% Tick to publish metrics
|
% Tick to publish metrics
|
||||||
{ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}.
|
{ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}.
|
||||||
|
|
||||||
|
|
|
@ -55,7 +55,7 @@ start_link(Pool, Id, Env) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
%% @doc Subscribe to a Topic
|
%% @doc Subscribe to a Topic
|
||||||
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok).
|
-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok).
|
||||||
subscribe(Topic, Subscriber, Options) ->
|
subscribe(Topic, Subscriber, Options) ->
|
||||||
call(pick(Topic), {subscribe, Topic, Subscriber, Options}).
|
call(pick(Topic), {subscribe, Topic, Subscriber, Options}).
|
||||||
|
|
||||||
|
|
|
@ -112,7 +112,7 @@ print(Topic) ->
|
||||||
add_route(Topic) when is_binary(Topic) ->
|
add_route(Topic) when is_binary(Topic) ->
|
||||||
add_route(#mqtt_route{topic = Topic, node = node()});
|
add_route(#mqtt_route{topic = Topic, node = node()});
|
||||||
add_route(Route = #mqtt_route{topic = Topic}) ->
|
add_route(Route = #mqtt_route{topic = Topic}) ->
|
||||||
case emqttd_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> case mnesia:is_transaction() of
|
true -> case mnesia:is_transaction() of
|
||||||
true -> add_trie_route(Route);
|
true -> add_trie_route(Route);
|
||||||
false -> trans(fun add_trie_route/1, [Route])
|
false -> trans(fun add_trie_route/1, [Route])
|
||||||
|
@ -125,7 +125,7 @@ add_direct_route(Route) ->
|
||||||
|
|
||||||
add_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
add_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
||||||
case mnesia:wread({mqtt_route, Topic}) of
|
case mnesia:wread({mqtt_route, Topic}) of
|
||||||
[] -> emqttd_trie:insert(Topic);
|
[] -> emqx_trie:insert(Topic);
|
||||||
_ -> ok
|
_ -> ok
|
||||||
end,
|
end,
|
||||||
mnesia:write(Route).
|
mnesia:write(Route).
|
||||||
|
@ -135,7 +135,7 @@ add_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
||||||
del_route(Topic) when is_binary(Topic) ->
|
del_route(Topic) when is_binary(Topic) ->
|
||||||
del_route(#mqtt_route{topic = Topic, node = node()});
|
del_route(#mqtt_route{topic = Topic, node = node()});
|
||||||
del_route(Route = #mqtt_route{topic = Topic}) ->
|
del_route(Route = #mqtt_route{topic = Topic}) ->
|
||||||
case emqttd_topic:wildcard(Topic) of
|
case emqx_topic:wildcard(Topic) of
|
||||||
true -> case mnesia:is_transaction() of
|
true -> case mnesia:is_transaction() of
|
||||||
true -> del_trie_route(Route);
|
true -> del_trie_route(Route);
|
||||||
false -> trans(fun del_trie_route/1, [Route])
|
false -> trans(fun del_trie_route/1, [Route])
|
||||||
|
@ -150,7 +150,7 @@ del_trie_route(Route = #mqtt_route{topic = Topic}) ->
|
||||||
case mnesia:wread({mqtt_route, Topic}) of
|
case mnesia:wread({mqtt_route, Topic}) of
|
||||||
[Route] -> %% Remove route and trie
|
[Route] -> %% Remove route and trie
|
||||||
mnesia:delete_object(Route),
|
mnesia:delete_object(Route),
|
||||||
emqttd_trie:delete(Topic);
|
emqx_trie:delete(Topic);
|
||||||
[_|_] -> %% Remove route only
|
[_|_] -> %% Remove route only
|
||||||
mnesia:delete_object(Route);
|
mnesia:delete_object(Route);
|
||||||
[] -> ok
|
[] -> ok
|
||||||
|
|
|
@ -62,11 +62,11 @@ start_link(Pool, Id, Env) ->
|
||||||
subscribe(Topic) when is_binary(Topic) ->
|
subscribe(Topic) when is_binary(Topic) ->
|
||||||
subscribe(Topic, self()).
|
subscribe(Topic, self()).
|
||||||
|
|
||||||
-spec(subscribe(binary(), emqttd:subscriber()) -> ok | {error, term()}).
|
-spec(subscribe(binary(), emqx:subscriber()) -> ok | {error, term()}).
|
||||||
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
subscribe(Topic, Subscriber) when is_binary(Topic) ->
|
||||||
subscribe(Topic, Subscriber, []).
|
subscribe(Topic, Subscriber, []).
|
||||||
|
|
||||||
-spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) ->
|
-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) ->
|
||||||
ok | {error, term()}).
|
ok | {error, term()}).
|
||||||
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
subscribe(Topic, Subscriber, Options) when is_binary(Topic) ->
|
||||||
call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
|
call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}).
|
||||||
|
@ -158,7 +158,7 @@ with_subproperty(Subscriptions) when is_list(Subscriptions) ->
|
||||||
subscribers(Topic) when is_binary(Topic) ->
|
subscribers(Topic) when is_binary(Topic) ->
|
||||||
emqx_pubsub:subscribers(Topic).
|
emqx_pubsub:subscribers(Topic).
|
||||||
|
|
||||||
-spec(subscribed(binary(), emqttd:subscriber()) -> boolean()).
|
-spec(subscribed(binary(), emqx:subscriber()) -> boolean()).
|
||||||
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
|
||||||
ets:member(mqtt_subproperty, {Topic, SubPid});
|
ets:member(mqtt_subproperty, {Topic, SubPid});
|
||||||
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
|
subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) ->
|
||||||
|
|
|
@ -107,7 +107,7 @@ unregister_session(ClientId) ->
|
||||||
unregister_session(ClientId, Pid) ->
|
unregister_session(ClientId, Pid) ->
|
||||||
case ets:lookup(mqtt_local_session, ClientId) of
|
case ets:lookup(mqtt_local_session, ClientId) of
|
||||||
[LocalSess = {_, Pid, _, _}] ->
|
[LocalSess = {_, Pid, _, _}] ->
|
||||||
emqttd_stats:del_session_stats(ClientId),
|
emqx_stats:del_session_stats(ClientId),
|
||||||
ets:delete_object(mqtt_local_session, LocalSess);
|
ets:delete_object(mqtt_local_session, LocalSess);
|
||||||
_ ->
|
_ ->
|
||||||
false
|
false
|
||||||
|
|
Loading…
Reference in New Issue