diff --git a/Makefile b/Makefile index 0718648f8..42eaffc57 100644 --- a/Makefile +++ b/Makefile @@ -14,9 +14,9 @@ dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_getopt = git https://github.com/jcomellas/getopt v0.8.2 dep_lager = git https://github.com/basho/lager master dep_lager_syslog = git https://github.com/basho/lager_syslog -dep_esockd = git https://github.com/emqtt/esockd v5.2.1 -dep_ekka = git https://github.com/emqtt/ekka v0.2.2 -dep_mochiweb = git https://github.com/emqtt/mochiweb v4.2.2 +dep_esockd = git https://github.com/emqtt/esockd emqx30 +dep_ekka = git https://github.com/emqtt/ekka develop +dep_mochiweb = git https://github.com/emqtt/mochiweb emqx30 dep_pbkdf2 = git https://github.com/emqtt/pbkdf2 2.0.1 dep_bcrypt = git https://github.com/smarkets/erlang-bcrypt master dep_clique = git https://github.com/emqtt/clique @@ -29,16 +29,19 @@ ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish dep_cuttlefish = git https://github.com/emqtt/cuttlefish -TEST_DEPS = emqttc -dep_emqttc = git https://github.com/emqtt/emqttc +TEST_DEPS = emqx_ct_helplers +dep_emqx_ct_helplers = git git@github.com:emqx/emqx_ct_helpers TEST_ERLC_OPTS += +debug_info TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose -CT_SUITES = emqx emqx_broker emqx_mod emqx_lib emqx_topic emqx_trie emqx_mqueue emqx_inflight \ - emqx_vm emqx_net emqx_protocol emqx_access emqx_router +CT_SUITES = emqx_inflight +## emqx_trie emqx_router emqx_frame emqx_mqtt_compat + +#CT_SUITES = emqx emqx_broker emqx_mod emqx_lib emqx_topic emqx_mqueue emqx_inflight \ +# emqx_vm emqx_net emqx_protocol emqx_access emqx_router CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index aa51c0df0..869ef2649 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -42,7 +42,20 @@ %% @doc Start access control server. -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + case gen_server:start_link({local, ?SERVER}, ?MODULE, [], []) of + {ok, Pid} -> + ok = register_default_mod(), + {ok, Pid}; + {error, Reason} -> + {error, Reason} + end. + +register_default_mod() -> + case emqx_config:get_env(acl_file) of + {ok, File} -> + emqx_access_control:register_mod(acl, emqx_acl_internal, [File]); + undefined -> ok + end. %% @doc Authenticate Client. -spec(auth(Client :: client(), Password :: password()) diff --git a/src/emqx_app.erl b/src/emqx_app.erl index f7a51fc7f..54926f2a7 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -1,25 +1,23 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- +%%%=================================================================== +%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. +%%% +%%% Licensed under the Apache License, Version 2.0 (the "License"); +%%% you may not use this file except in compliance with the License. +%%% You may obtain a copy of the License at +%%% +%%% http://www.apache.org/licenses/LICENSE-2.0 +%%% +%%% Unless required by applicable law or agreed to in writing, software +%%% distributed under the License is distributed on an "AS IS" BASIS, +%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%%% See the License for the specific language governing permissions and +%%% limitations under the License. +%%%=================================================================== -module(emqx_app). -behaviour(application). --include("emqx_mqtt.hrl"). - %% Application callbacks -export([start/2, stop/1]). @@ -33,9 +31,10 @@ start(_Type, _Args) -> print_banner(), ekka:start(), {ok, Sup} = emqx_sup:start_link(), - %%TODO: fixme later - ok = register_acl_mod(), emqx_modules:load(), + emqx_plugins:init(), + emqx_plugins:load(), + emqx_listeners:start(), start_autocluster(), register(emqx, self()), print_vsn(), @@ -43,8 +42,8 @@ start(_Type, _Args) -> -spec(stop(State :: term()) -> term()). stop(_State) -> - emqx_modules:unload(), - catch emqx_mqtt:shutdown(). + emqx_listeners:stop(), + emqx_modules:unload(). %%-------------------------------------------------------------------- %% Print Banner @@ -57,16 +56,6 @@ print_vsn() -> {ok, Vsn} = application:get_key(vsn), io:format("~s ~s is running now!~n", [?APP, Vsn]). -%%-------------------------------------------------------------------- -%% Register default ACL File -%%-------------------------------------------------------------------- - -register_acl_mod() -> - case emqx_config:get_env(acl_file) of - {ok, File} -> emqx_access_control:register_mod(acl, emqx_acl_internal, [File]); - undefined -> ok - end. - %%-------------------------------------------------------------------- %% Autocluster %%-------------------------------------------------------------------- @@ -74,10 +63,5 @@ register_acl_mod() -> start_autocluster() -> ekka:callback(prepare, fun emqx:shutdown/1), ekka:callback(reboot, fun emqx:reboot/0), - ekka:autocluster(?APP, fun after_autocluster/0). - -after_autocluster() -> - emqx_plugins:init(), - emqx_plugins:load(), - emqx_mqtt:bootstrap(). + ekka:autocluster(?APP). diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index aaacb086d..5a6c9d954 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -28,6 +28,7 @@ -export([dispatch/2, dispatch/3]). -export([subscriptions/1, subscribers/1, subscribed/2]). -export([get_subopts/2, set_subopts/3]). +-export([topics/0]). %% gen_server Function Exports -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -95,13 +96,17 @@ unsubscribe(Topic, Subscriber, Timeout) -> %% Publish %%-------------------------------------------------------------------- +-spec(publish(topic(), payload()) -> delivery() | stopped). +publish(Topic, Payload) when is_binary(Topic), is_binary(Payload) -> + publish(emqx_message:make(Topic, Payload)). + -spec(publish(message()) -> delivery() | stopped). publish(Msg = #message{from = From}) -> %% Hook to trace? trace(publish, From, Msg), case emqx_hooks:run('message.publish', [], Msg) of {ok, Msg1 = #message{topic = Topic}} -> - publish(Topic, Msg1); + route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)); {stop, Msg1} -> emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg1)]), stopped @@ -123,8 +128,9 @@ trace(public, From, #message{topic = Topic, payload = Payload}) emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). -publish(Topic, Msg) -> - route(aggre(emqx_router:match_routes(Topic)), delivery(Msg)). +%%-------------------------------------------------------------------- +%% Route +%%-------------------------------------------------------------------- route([], Delivery = #delivery{message = Msg}) -> emqx_hooks:run('message.dropped', [undefined, Msg]), @@ -258,6 +264,9 @@ pick(SubId) when is_binary(SubId) -> pick({SubId, SubPid}) when is_binary(SubId), is_pid(SubPid) -> pick(SubId). +-spec(topics() -> [topic()]). +topics() -> emqx_router:topics(). + %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 04d9c1d9c..79adf6ad1 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -19,15 +19,16 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([make/3, make/4]). - +-export([make/2, make/3, make/4]). -export([get_flag/2, get_flag/3, set_flag/2, unset_flag/2]). - -export([get_header/2, get_header/3, set_header/3]). - -export([get_user_property/2, get_user_property/3, set_user_property/3]). -%% Create a default message +-spec(make(topic(), payload()) -> message()). +make(Topic, Payload) -> + make(undefined, Topic, Payload). + +%% Create a message -spec(make(atom() | client(), topic(), payload()) -> message()). make(From, Topic, Payload) when is_atom(From); is_record(From, client) -> make(From, ?QOS_0, Topic, Payload).