diff --git a/src/emqttd.erl b/src/emqttd.erl index 69cf92482..9a740e1cd 100644 --- a/src/emqttd.erl +++ b/src/emqttd.erl @@ -16,19 +16,20 @@ -module(emqttd). --export([start/0, env/1, env/2, start_listeners/0, stop_listeners/0, - load_all_mods/0, is_mod_enabled/1, is_running/1]). +-include("emqttd.hrl"). --define(MQTT_SOCKOPTS, [ - binary, - {packet, raw}, - {reuseaddr, true}, - {backlog, 512}, - {nodelay, true}]). +-include("emqttd_protocol.hrl"). + +-export([start/0, env/1, env/2, is_running/1]). + +-export([create/2, publish/1, subscribe/1, subscribe/3, + unsubscribe/1, unsubscribe/3]). -define(APP, ?MODULE). --type listener() :: {atom(), inet:port_number(), [esockd:option()]}. +%%-------------------------------------------------------------------- +%% Bootstrap, environment, is_running... +%%-------------------------------------------------------------------- %% @doc Start emqttd application. -spec start() -> ok | {error, any()}. @@ -42,55 +43,6 @@ env(Group) -> application:get_env(?APP, Group, []). -spec env(Group :: atom(), Name :: atom()) -> undefined | any(). env(Group, Name) -> proplists:get_value(Name, env(Group)). -%% @doc Start Listeners of the broker. --spec start_listeners() -> any(). -start_listeners() -> lists:foreach(fun start_listener/1, env(listeners)). - -%% Start mqtt listener --spec start_listener(listener()) -> any(). -start_listener({mqtt, Port, Opts}) -> start_listener(mqtt, Port, Opts); - -%% Start mqtt(SSL) listener -start_listener({mqtts, Port, Opts}) -> start_listener(mqtts, Port, Opts); - -%% Start http listener -start_listener({http, Port, Opts}) -> - mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}); - -%% Start https listener -start_listener({https, Port, Opts}) -> - mochiweb:start_http(Port, Opts, {emqttd_http, handle_request, []}). - -start_listener(Protocol, Port, Opts) -> - MFArgs = {emqttd_client, start_link, [env(mqtt)]}, - esockd:open(Protocol, Port, merge_sockopts(Opts), MFArgs). - -merge_sockopts(Options) -> - SockOpts = emqttd_opts:merge(?MQTT_SOCKOPTS, - proplists:get_value(sockopts, Options, [])), - emqttd_opts:merge(Options, [{sockopts, SockOpts}]). - -%% @doc Stop Listeners -stop_listeners() -> lists:foreach(fun stop_listener/1, env(listeners)). - -stop_listener({Protocol, Port, _Opts}) -> esockd:close({Protocol, Port}). - -%% @doc load all modules -load_all_mods() -> - lists:foreach(fun load_mod/1, env(modules)). - -load_mod({Name, Opts}) -> - Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)), - case catch Mod:load(Opts) of - ok -> lager:info("Load module ~s successfully", [Name]); - {error, Error} -> lager:error("Load module ~s error: ~p", [Name, Error]); - {'EXIT', Reason} -> lager:error("Load module ~s error: ~p", [Name, Reason]) - end. - -%% @doc Is module enabled? --spec is_mod_enabled(Name :: atom()) -> boolean(). -is_mod_enabled(Name) -> env(modules, Name) =/= undefined. - %% @doc Is running? -spec is_running(node()) -> boolean(). is_running(Node) -> @@ -100,3 +52,34 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. +%%-------------------------------------------------------------------- +%% PubSub APIs that wrap emqttd_server, emqttd_pubsub +%%-------------------------------------------------------------------- + +%% @doc Create a Topic +create(topic, Topic) when is_binary(Topic) -> + emqttd_pubsub:create_topic(Topic). + +%% @doc Publish MQTT Message +-spec publish(mqtt_message()) -> ok. +publish(Msg) when is_record(Msg, mqtt_message) -> + emqttd_server:publish(Msg). + +%% @doc Subscribe +-spec subscribe(binary()) -> ok. +subscribe(Topic) when is_binary(Topic) -> + emqttd_server:subscribe(Topic). + +-spec subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}. +subscribe(ClientId, Topic, Qos) -> + emqttd_server:subscribe(ClientId, Topic, Qos). + +%% @doc Unsubscribe +-spec unsubscribe(binary()) -> ok. +unsubscribe(Topic) when is_binary(Topic) -> + emqttd_server:unsubscribe(Topic). + +-spec unsubscribe(binary(), binary(), mqtt_qos()) -> ok. +unsubscribe(ClientId, Topic, Qos) -> + emqttd_server:unsubscribe(ClientId, Topic, Qos). +