move start_listeners/0, stop_listeners/0 to emqttd_app.erl, add publish/subscribe APIs

This commit is contained in:
Feng 2016-03-08 13:15:50 +08:00
parent 652f5f5767
commit 52adc66bda
1 changed files with 41 additions and 58 deletions

View File

@ -16,19 +16,20 @@
-module(emqttd).
-export([start/0, env/1, env/2, start_listeners/0, stop_listeners/0,
load_all_mods/0, is_mod_enabled/1, is_running/1]).
-include("emqttd.hrl").
-define(MQTT_SOCKOPTS, [
binary,
{packet, raw},
{reuseaddr, true},
{backlog, 512},
{nodelay, true}]).
-include("emqttd_protocol.hrl").
-export([start/0, env/1, env/2, is_running/1]).
-export([create/2, publish/1, subscribe/1, subscribe/3,
unsubscribe/1, unsubscribe/3]).
-define(APP, ?MODULE).
-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
%%--------------------------------------------------------------------
%% Bootstrap, environment, is_running...
%%--------------------------------------------------------------------
%% @doc Start emqttd application.
-spec start() -> ok | {error, any()}.
@ -42,55 +43,6 @@ env(Group) -> application:get_env(?APP, Group, []).
-spec env(Group :: atom(), Name :: atom()) -> undefined | any().
env(Group, Name) -> proplists:get_value(Name, env(Group)).
%% @doc Start Listeners of the broker.
-spec start_listeners() -> any().
start_listeners() -> lists:foreach(fun start_listener/1, env(listeners)).
%% Start mqtt listener
-spec start_listener(listener()) -> any().
start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts);
%% Start mqtt(SSL) listener
start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts);
%% Start http listener
start_listener({http, Port, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []});
%% Start https listener
start_listener({https, Port, Opts}) ->
mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}).
start_listener(Protocol, Port, Opts) ->
MFArgs = {emqttd_client, start_link, [env(mqtt)]},
esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs).
merge_sockopts(Options) ->
SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS,
proplists:get_value(sockopts, Options, [])),
emqttd_opts:merge(Options, [{sockopts, SockOpts}]).
%% @doc Stop Listeners
stop_listeners() -> lists:foreach(fun stop_listener/1, env(listeners)).
stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}).
%% @doc load all modules
load_all_mods() ->
lists:foreach(fun load_mod/1, env(modules)).
load_mod({Name, Opts}) ->
Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
case catch Mod:load(Opts) of
ok -> lager:info("Load module ~s successfully", [Name]);
{error, Error} -> lager:error("Load module ~s error: ~p", [Name, Error]);
{'EXIT', Reason} -> lager:error("Load module ~s error: ~p", [Name, Reason])
end.
%% @doc Is module enabled?
-spec is_mod_enabled(Name :: atom()) -> boolean().
is_mod_enabled(Name) -> env(modules, Name) =/= undefined.
%% @doc Is running?
-spec is_running(node()) -> boolean().
is_running(Node) ->
@ -100,3 +52,34 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true
end.
%%--------------------------------------------------------------------
%% PubSub APIs that wrap emqttd_server, emqttd_pubsub
%%--------------------------------------------------------------------
%% @doc Create a Topic
create(topic, Topic) when is_binary(Topic) ->
emqttd_pubsub:create_topic(Topic).
%% @doc Publish MQTT Message
-spec publish(mqtt_message()) -> ok.
publish(Msg) when is_record(Msg, mqtt_message) ->
emqttd_server:publish(Msg).
%% @doc Subscribe
-spec subscribe(binary()) -> ok.
subscribe(Topic) when is_binary(Topic) ->
emqttd_server:subscribe(Topic).
-spec subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}.
subscribe(ClientId, Topic, Qos) ->
emqttd_server:subscribe(ClientId, Topic, Qos).
%% @doc Unsubscribe
-spec unsubscribe(binary()) -> ok.
unsubscribe(Topic) when is_binary(Topic) ->
emqttd_server:unsubscribe(Topic).
-spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok.
unsubscribe(ClientId, Topic, Qos) ->
emqttd_server:unsubscribe(ClientId, Topic, Qos).