diff --git a/Makefile b/Makefile index d9d04c517..d4cd2009f 100644 --- a/Makefile +++ b/Makefile @@ -34,7 +34,6 @@ TEST_ERLC_OPTS += +debug_info TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose -# EUNIT_ERL_OPTS = CT_SUITES = emqx emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ emqx_vm emqx_net emqx_protocol emqx_access emqx_config emqx_router diff --git a/src/emqttd.erl b/src/emqttd.erl deleted file mode 100644 index 65739952f..000000000 --- a/src/emqttd.erl +++ /dev/null @@ -1,181 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - -%% @doc EMQ Main Module. - --module(emqttd). - --author("Feng Lee "). - --include("emqttd.hrl"). - --include("emqttd_protocol.hrl"). - --export([start/0, env/1, env/2, is_running/1, stop/0]). - -%% PubSub API --export([subscribe/1, subscribe/2, subscribe/3, publish/1, - unsubscribe/1, unsubscribe/2]). - -%% PubSub Management API --export([setqos/3, topics/0, subscriptions/1, subscribers/1, subscribed/2]). - -%% Hooks API --export([hook/4, hook/3, unhook/2, run_hooks/2, run_hooks/3]). - -%% Debug API --export([dump/0]). - -%% Shutdown and reboot --export([shutdown/0, shutdown/1, reboot/0]). - --type(subid() :: binary()). - --type(subscriber() :: pid() | subid() | {subid(), pid()}). - --type(suboption() :: local | {qos, non_neg_integer()} | {share, {'$queue' | binary()}}). - --export_type([subscriber/0, suboption/0]). - --define(APP, ?MODULE). - -%%-------------------------------------------------------------------- -%% Bootstrap, environment, configuration, is_running... -%%-------------------------------------------------------------------- - -%% @doc Start emqttd application. --spec(start() -> ok | {error, term()}). -start() -> application:start(?APP). - -%% @doc Stop emqttd application. --spec(stop() -> ok | {error, term()}). -stop() -> application:stop(?APP). - -%% @doc Environment --spec(env(Key :: atom()) -> {ok, any()} | undefined). -env(Key) -> application:get_env(?APP, Key). - -%% @doc Get environment --spec(env(Key :: atom(), Default :: any()) -> undefined | any()). -env(Key, Default) -> application:get_env(?APP, Key, Default). - -%% @doc Is running? --spec(is_running(node()) -> boolean()). -is_running(Node) -> - case rpc:call(Node, erlang, whereis, [?APP]) of - {badrpc, _} -> false; - undefined -> false; - Pid when is_pid(Pid) -> true - end. - -%%-------------------------------------------------------------------- -%% PubSub APIs -%%-------------------------------------------------------------------- - -%% @doc Subscribe --spec(subscribe(iodata()) -> ok | {error, term()}). -subscribe(Topic) -> - emqttd_server:subscribe(iolist_to_binary(Topic)). - --spec(subscribe(iodata(), subscriber()) -> ok | {error, term()}). -subscribe(Topic, Subscriber) -> - emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber). - --spec(subscribe(iodata(), subscriber(), [suboption()]) -> ok | {error, term()}). -subscribe(Topic, Subscriber, Options) -> - emqttd_server:subscribe(iolist_to_binary(Topic), Subscriber, Options). - -%% @doc Publish MQTT Message --spec(publish(mqtt_message()) -> {ok, mqtt_delivery()} | ignore). -publish(Msg) -> - emqttd_server:publish(Msg). - -%% @doc Unsubscribe --spec(unsubscribe(iodata()) -> ok | {error, term()}). -unsubscribe(Topic) -> - emqttd_server:unsubscribe(iolist_to_binary(Topic)). - --spec(unsubscribe(iodata(), subscriber()) -> ok | {error, term()}). -unsubscribe(Topic, Subscriber) -> - emqttd_server:unsubscribe(iolist_to_binary(Topic), Subscriber). - --spec(setqos(binary(), subscriber(), mqtt_qos()) -> ok). -setqos(Topic, Subscriber, Qos) -> - emqttd_server:setqos(iolist_to_binary(Topic), Subscriber, Qos). - --spec(topics() -> [binary()]). -topics() -> emqttd_router:topics(). - --spec(subscribers(iodata()) -> list(subscriber())). -subscribers(Topic) -> - emqttd_server:subscribers(iolist_to_binary(Topic)). - --spec(subscriptions(subscriber()) -> [{emqttd:subscriber(), binary(), list(emqttd:suboption())}]). -subscriptions(Subscriber) -> - emqttd_server:subscriptions(Subscriber). - --spec(subscribed(iodata(), subscriber()) -> boolean()). -subscribed(Topic, Subscriber) -> - emqttd_server:subscribed(iolist_to_binary(Topic), Subscriber). - -%%-------------------------------------------------------------------- -%% Hooks API -%%-------------------------------------------------------------------- - --spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any())) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs) -> - emqttd_hooks:add(Hook, TagFunction, InitArgs). - --spec(hook(atom(), function() | {emqttd_hooks:hooktag(), function()}, list(any()), integer()) - -> ok | {error, term()}). -hook(Hook, TagFunction, InitArgs, Priority) -> - emqttd_hooks:add(Hook, TagFunction, InitArgs, Priority). - --spec(unhook(atom(), function() | {emqttd_hooks:hooktag(), function()}) - -> ok | {error, term()}). -unhook(Hook, TagFunction) -> - emqttd_hooks:delete(Hook, TagFunction). - --spec(run_hooks(atom(), list(any())) -> ok | stop). -run_hooks(Hook, Args) -> - emqttd_hooks:run(Hook, Args). - --spec(run_hooks(atom(), list(any()), any()) -> {ok | stop, any()}). -run_hooks(Hook, Args, Acc) -> - emqttd_hooks:run(Hook, Args, Acc). - -%%-------------------------------------------------------------------- -%% Shutdown and reboot -%%-------------------------------------------------------------------- - -shutdown() -> - shutdown(normal). - -shutdown(Reason) -> - lager:error("EMQ shutdown for ~s", [Reason]), - emqttd_plugins:unload(), - lists:foreach(fun application:stop/1, [emqttd, ekka, mochiweb, esockd, gproc]). - -reboot() -> - lists:foreach(fun application:start/1, [gproc, esockd, mochiweb, ekka, emqttd]). - -%%-------------------------------------------------------------------- -%% Debug -%%-------------------------------------------------------------------- - -dump() -> lists:append([emqttd_server:dump(), emqttd_router:dump()]). - diff --git a/src/emqx.app.src b/src/emqx.app.src index eece8df33..43c0afb21 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,11 +1,11 @@ {application,emqx, [{description,"EMQ X Broker"}, - {vsn,"2.4"}, + {vsn,"2.3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb, lager_syslog,pbkdf2,bcrypt,clique,jsx]}, + {applications,[kernel,stdlib,gproc,gen_rpc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, {licenses,["Apache-2.0"]}, - {links,[{"Github","https://github.com/emqtt/emqttd"}]}]}. + {links,[{"Github","https://github.com/emqx/emqx"}]}]}. diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 166173d18..f8cb7b60b 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -249,8 +249,6 @@ init([]) -> ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), % Init metrics [create_metric(Metric) || Metric <- Metrics], - % $SYS Topics for metrics - % [ok = emqttd:create(topic, metric_topic(Topic)) || {_, Topic} <- Metrics], % Tick to publish metrics {ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}. diff --git a/src/emqx_pubsub.erl b/src/emqx_pubsub.erl index 06f1ea3bc..7885dd811 100644 --- a/src/emqx_pubsub.erl +++ b/src/emqx_pubsub.erl @@ -55,7 +55,7 @@ start_link(Pool, Id, Env) -> %%-------------------------------------------------------------------- %% @doc Subscribe to a Topic --spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> ok). +-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok). subscribe(Topic, Subscriber, Options) -> call(pick(Topic), {subscribe, Topic, Subscriber, Options}). diff --git a/src/emqx_router.erl b/src/emqx_router.erl index ac356f7de..d50654033 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -112,7 +112,7 @@ print(Topic) -> add_route(Topic) when is_binary(Topic) -> add_route(#mqtt_route{topic = Topic, node = node()}); add_route(Route = #mqtt_route{topic = Topic}) -> - case emqttd_topic:wildcard(Topic) of + case emqx_topic:wildcard(Topic) of true -> case mnesia:is_transaction() of true -> add_trie_route(Route); false -> trans(fun add_trie_route/1, [Route]) @@ -125,7 +125,7 @@ add_direct_route(Route) -> add_trie_route(Route = #mqtt_route{topic = Topic}) -> case mnesia:wread({mqtt_route, Topic}) of - [] -> emqttd_trie:insert(Topic); + [] -> emqx_trie:insert(Topic); _ -> ok end, mnesia:write(Route). @@ -135,7 +135,7 @@ add_trie_route(Route = #mqtt_route{topic = Topic}) -> del_route(Topic) when is_binary(Topic) -> del_route(#mqtt_route{topic = Topic, node = node()}); del_route(Route = #mqtt_route{topic = Topic}) -> - case emqttd_topic:wildcard(Topic) of + case emqx_topic:wildcard(Topic) of true -> case mnesia:is_transaction() of true -> del_trie_route(Route); false -> trans(fun del_trie_route/1, [Route]) @@ -150,7 +150,7 @@ del_trie_route(Route = #mqtt_route{topic = Topic}) -> case mnesia:wread({mqtt_route, Topic}) of [Route] -> %% Remove route and trie mnesia:delete_object(Route), - emqttd_trie:delete(Topic); + emqx_trie:delete(Topic); [_|_] -> %% Remove route only mnesia:delete_object(Route); [] -> ok diff --git a/src/emqx_server.erl b/src/emqx_server.erl index 12886656d..726750881 100644 --- a/src/emqx_server.erl +++ b/src/emqx_server.erl @@ -62,11 +62,11 @@ start_link(Pool, Id, Env) -> subscribe(Topic) when is_binary(Topic) -> subscribe(Topic, self()). --spec(subscribe(binary(), emqttd:subscriber()) -> ok | {error, term()}). +-spec(subscribe(binary(), emqx:subscriber()) -> ok | {error, term()}). subscribe(Topic, Subscriber) when is_binary(Topic) -> subscribe(Topic, Subscriber, []). --spec(subscribe(binary(), emqttd:subscriber(), [emqttd:suboption()]) -> +-spec(subscribe(binary(), emqx:subscriber(), [emqx:suboption()]) -> ok | {error, term()}). subscribe(Topic, Subscriber, Options) when is_binary(Topic) -> call(pick(Subscriber), {subscribe, Topic, with_subpid(Subscriber), Options}). @@ -158,7 +158,7 @@ with_subproperty(Subscriptions) when is_list(Subscriptions) -> subscribers(Topic) when is_binary(Topic) -> emqx_pubsub:subscribers(Topic). --spec(subscribed(binary(), emqttd:subscriber()) -> boolean()). +-spec(subscribed(binary(), emqx:subscriber()) -> boolean()). subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> ets:member(mqtt_subproperty, {Topic, SubPid}); subscribed(Topic, SubId) when is_binary(Topic), is_binary(SubId) -> diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index 78e53da70..3322aed1f 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -107,7 +107,7 @@ unregister_session(ClientId) -> unregister_session(ClientId, Pid) -> case ets:lookup(mqtt_local_session, ClientId) of [LocalSess = {_, Pid, _, _}] -> - emqttd_stats:del_session_stats(ClientId), + emqx_stats:del_session_stats(ClientId), ets:delete_object(mqtt_local_session, LocalSess); _ -> false