From 9976327c8d0c40597e68b7031bf9f6ea9eb7ed42 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Fri, 23 Mar 2018 18:13:09 +0800 Subject: [PATCH] Add emqx_mqtt module --- include/emqx.hrl | 9 ++- src/emqx.erl | 88 ---------------------------- src/emqx_app.erl | 8 ++- src/emqx_bridge.erl | 8 +-- src/emqx_connection.erl | 4 +- src/emqx_message.erl | 112 ++++++++++++++++++------------------ src/emqx_metrics.erl | 12 ++-- src/emqx_mod_rewrite.erl | 4 +- src/emqx_mqtt.erl | 115 +++++++++++++++++++++++++++++++++++++ src/emqx_mqtt_app.erl | 29 ---------- src/emqx_mqueue.erl | 8 +-- src/emqx_protocol.erl | 22 +++---- src/emqx_session.erl | 42 +++++++------- src/emqx_stats.erl | 5 +- src/emqx_sup.erl | 5 +- src/emqx_ws_connection.erl | 4 +- 16 files changed, 244 insertions(+), 231 deletions(-) create mode 100644 src/emqx_mqtt.erl delete mode 100644 src/emqx_mqtt_app.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 5c67236d0..d584f2cd3 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -76,7 +76,6 @@ -type(message_from() :: #{zone := atom(), node := atom(), clientid := binary(), - protocol := protocol(), connector => atom(), peername => {inet:ip_address(), inet:port_number()}, username => binary(), @@ -99,8 +98,14 @@ { id :: message_id(), %% Global unique id from :: message_from(), %% Message from sender :: pid(), %% The pid of the sender/publisher - flags :: message_flags(), %% Message flags + packet_id, + dup :: boolean(), %% Dup flag + qos :: 0 | 1 | 2, %% QoS + sys :: boolean(), %% $SYS flag + retain :: boolean(), %% Retain flag + flags = [], %% :: message_flags(), %% Message flags headers :: message_headers(), %% Message headers + protocol :: protocol(), topic :: binary(), %% Message topic properties :: map(), %% Message user properties payload :: binary(), %% Message payload diff --git a/src/emqx.erl b/src/emqx.erl index 967478a8c..13fe0db90 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -18,16 +18,9 @@ -include("emqx.hrl"). --include("emqx_mqtt.hrl"). - %% Start/Stop Application -export([start/0, env/1, env/2, is_running/1, stop/0]). -%% Start/Stop Listeners --export([start_listeners/0, start_listener/1, listeners/0, - stop_listeners/0, stop_listener/1, - restart_listeners/0, restart_listener/1]). - %% PubSub API -export([subscribe/1, subscribe/2, subscribe/3, publish/1, unsubscribe/1, unsubscribe/2]). @@ -47,8 +40,6 @@ %% Shutdown and reboot -export([shutdown/0, shutdown/1, reboot/0]). --type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). - -define(APP, ?MODULE). %%-------------------------------------------------------------------- @@ -80,85 +71,6 @@ is_running(Node) -> Pid when is_pid(Pid) -> true end. -%%-------------------------------------------------------------------- -%% Start/Stop Listeners -%%-------------------------------------------------------------------- - -%% @doc Start Listeners. --spec(start_listeners() -> ok). -start_listeners() -> lists:foreach(fun start_listener/1, env(listeners, [])). - -%% Start mqtt listener --spec(start_listener(listener()) -> {ok, pid()} | {error, any()}). -start_listener({tcp, ListenOn, Opts}) -> - start_listener('mqtt:tcp', ListenOn, Opts); - -%% Start mqtt(SSL) listener -start_listener({ssl, ListenOn, Opts}) -> - start_listener('mqtt:ssl', ListenOn, Opts); - -%% Start http listener -start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> - {ok, _} = mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqx_ws, handle_request, []}); - -%% Start https listener -start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> - {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). - -start_listener(Proto, ListenOn, Opts) -> - Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), - MFArgs = {emqx_connection, start_link, [Env]}, - {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). - -listeners() -> - [Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)]. - -is_mqtt('mqtt:tcp') -> true; -is_mqtt('mqtt:ssl') -> true; -is_mqtt('mqtt:ws') -> true; -is_mqtt('mqtt:wss') -> true; -is_mqtt(_Proto) -> false. - -%% @doc Stop Listeners --spec(stop_listeners() -> ok). -stop_listeners() -> lists:foreach(fun stop_listener/1, env(listeners, [])). - --spec(stop_listener(listener()) -> ok | {error, any()}). -stop_listener({tcp, ListenOn, _Opts}) -> - esockd:close('mqtt:tcp', ListenOn); -stop_listener({ssl, ListenOn, _Opts}) -> - esockd:close('mqtt:ssl', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> - mochiweb:stop_http('mqtt:ws', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> - mochiweb:stop_http('mqtt:wss', ListenOn); -% stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> -% mochiweb:stop_http('mqtt:api', ListenOn); -stop_listener({Proto, ListenOn, _Opts}) -> - esockd:close(Proto, ListenOn). - -%% @doc Restart Listeners --spec(restart_listeners() -> ok). -restart_listeners() -> lists:foreach(fun restart_listener/1, env(listeners, [])). - --spec(restart_listener(listener()) -> any()). -restart_listener({tcp, ListenOn, _Opts}) -> - esockd:reopen('mqtt:tcp', ListenOn); -restart_listener({ssl, ListenOn, _Opts}) -> - esockd:reopen('mqtt:ssl', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> - mochiweb:restart_http('mqtt:ws', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> - mochiweb:restart_http('mqtt:wss', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) when Proto == api -> - mochiweb:restart_http('mqtt:api', ListenOn); -restart_listener({Proto, ListenOn, _Opts}) -> - esockd:reopen(Proto, ListenOn). - -merge_sockopts(Options) -> - SockOpts = emqx_misc:merge_opts( - ?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])), - emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]). %%-------------------------------------------------------------------- %% PubSub API diff --git a/src/emqx_app.erl b/src/emqx_app.erl index 00acf0d76..50c9810c6 100644 --- a/src/emqx_app.erl +++ b/src/emqx_app.erl @@ -26,13 +26,15 @@ -define(APP, emqx). %%-------------------------------------------------------------------- -%% Application Callbacks +%% Application callbacks %%-------------------------------------------------------------------- start(_Type, _Args) -> print_banner(), ekka:start(), {ok, Sup} = emqx_sup:start_link(), + %%TODO: fixme later + emqx_mqtt_metrics:init(), ok = register_acl_mod(), emqx_modules:load(), start_autocluster(), @@ -43,7 +45,7 @@ start(_Type, _Args) -> -spec(stop(State :: term()) -> term()). stop(_State) -> emqx_modules:unload(), - catch emqx:stop_listeners(). + catch emqx_mqtt:shutdown(). %%-------------------------------------------------------------------- %% Print Banner @@ -78,5 +80,5 @@ start_autocluster() -> after_autocluster() -> emqx_plugins:init(), emqx_plugins:load(), - emqx:start_listeners(). + emqx_mqtt:bootstrap(). diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 098b8a41a..5c20538f3 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -167,11 +167,11 @@ dequeue(State = #state{mqueue = MQ}) -> {empty, MQ1} -> State#state{mqueue = MQ1}; {{value, Msg}, MQ1} -> - handle_info({dispatch, Msg#mqtt_message.topic, Msg}, State), + handle_info({dispatch, Msg#message.topic, Msg}, State), dequeue(State#state{mqueue = MQ1}) end. -transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix, - topic_suffix = Suffix}) -> - Msg#mqtt_message{topic = <>}. +transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, + topic_suffix = Suffix}) -> + Msg#message{topic = <>}. diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 2b2f36dee..1555f29cb 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -203,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) -> %% Fastlane handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State); + handle_info({deliver, Message#message{qos = ?QOS_0}}, State); handle_info({deliver, Message}, State) -> with_proto( @@ -316,7 +316,7 @@ received(Bytes, State = #state{parser = Parser, {more, NewParser} -> {noreply, run_socket(State#state{parser = NewParser}), IdleTimeout}; {ok, Packet, Rest} -> - emqx_metrics:received(Packet), + emqx_mqtt_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> received(Rest, State#state{parser = emqx_parser:initial_state(PacketSize), diff --git a/src/emqx_message.erl b/src/emqx_message.erl index acbee1ce7..5ef4f8f8f 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -30,21 +30,21 @@ -type(msg_from() :: atom() | {binary(), undefined | binary()}). %% @doc Make a message --spec(make(msg_from(), binary(), binary()) -> mqtt_message()). +-spec(make(msg_from(), binary(), binary()) -> message()). make(From, Topic, Payload) -> make(From, ?QOS_0, Topic, Payload). --spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> mqtt_message()). +-spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> message()). make(From, Qos, Topic, Payload) -> - #mqtt_message{id = msgid(), - from = From, - qos = ?QOS_I(Qos), - topic = Topic, - payload = Payload, - timestamp = os:timestamp()}. + #message{id = msgid(), + from = From, + qos = ?QOS_I(Qos), + topic = Topic, + payload = Payload, + timestamp = os:timestamp()}. %% @doc Message from Packet --spec(from_packet(mqtt_packet()) -> mqtt_message()). +-spec(from_packet(mqtt_packet()) -> message()). from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, retain = Retain, qos = Qos, @@ -52,14 +52,14 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, variable = #mqtt_packet_publish{topic_name = Topic, packet_id = PacketId}, payload = Payload}) -> - #mqtt_message{id = msgid(), - packet_id = PacketId, - qos = Qos, - retain = Retain, - dup = Dup, - topic = Topic, - payload = Payload, - timestamp = os:timestamp()}; + #message{id = msgid(), + packet_id = PacketId, + qos = Qos, + retain = Retain, + dup = Dup, + topic = Topic, + payload = Payload, + timestamp = os:timestamp()}; from_packet(#mqtt_packet_connect{will_flag = false}) -> undefined; @@ -70,33 +70,33 @@ from_packet(#mqtt_packet_connect{client_id = ClientId, will_qos = Qos, will_topic = Topic, will_msg = Msg}) -> - #mqtt_message{id = msgid(), - topic = Topic, - from = {ClientId, Username}, - retain = Retain, - qos = Qos, - dup = false, - payload = Msg, - timestamp = os:timestamp()}. + #message{id = msgid(), + topic = Topic, + from = {ClientId, Username}, + retain = Retain, + qos = Qos, + dup = false, + payload = Msg, + timestamp = os:timestamp()}. from_packet(ClientId, Packet) -> Msg = from_packet(Packet), - Msg#mqtt_message{from = ClientId}. + Msg#message{from = ClientId}. from_packet(Username, ClientId, Packet) -> Msg = from_packet(Packet), - Msg#mqtt_message{from = {ClientId, Username}}. + Msg#message{from = {ClientId, Username}}. msgid() -> emqx_guid:gen(). %% @doc Message to Packet --spec(to_packet(mqtt_message()) -> mqtt_packet()). -to_packet(#mqtt_message{packet_id = PkgId, - qos = Qos, - retain = Retain, - dup = Dup, - topic = Topic, - payload = Payload}) -> +-spec(to_packet(message()) -> mqtt_packet()). +to_packet(#message{packet_id = PkgId, + qos = Qos, + retain = Retain, + dup = Dup, + topic = Topic, + payload = Payload}) -> #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, qos = Qos, @@ -111,40 +111,42 @@ to_packet(#mqtt_message{packet_id = PkgId, payload = Payload}. %% @doc set dup, retain flag --spec(set_flag(mqtt_message()) -> mqtt_message()). +-spec(set_flag(message()) -> message()). set_flag(Msg) -> - Msg#mqtt_message{dup = true, retain = true}. + Msg#message{dup = true, retain = true}. --spec(set_flag(atom(), mqtt_message()) -> mqtt_message()). -set_flag(dup, Msg = #mqtt_message{dup = false}) -> - Msg#mqtt_message{dup = true}; -set_flag(sys, Msg = #mqtt_message{sys = false}) -> - Msg#mqtt_message{sys = true}; -set_flag(retain, Msg = #mqtt_message{retain = false}) -> - Msg#mqtt_message{retain = true}; -set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. +-spec(set_flag(atom(), message()) -> message()). +set_flag(dup, Msg = #message{dup = false}) -> + Msg#message{dup = true}; +set_flag(sys, Msg = #message{sys = false}) -> + Msg#message{sys = true}; +set_flag(retain, Msg = #message{retain = false}) -> + Msg#message{retain = true}; +set_flag(Flag, Msg) when Flag =:= dup; + Flag =:= retain; + Flag =:= sys -> Msg. %% @doc Unset dup, retain flag --spec(unset_flag(mqtt_message()) -> mqtt_message()). +-spec(unset_flag(message()) -> message()). unset_flag(Msg) -> - Msg#mqtt_message{dup = false, retain = false}. + Msg#message{dup = false, retain = false}. --spec(unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message()). -unset_flag(dup, Msg = #mqtt_message{dup = true}) -> - Msg#mqtt_message{dup = false}; -unset_flag(retain, Msg = #mqtt_message{retain = true}) -> - Msg#mqtt_message{retain = false}; +-spec(unset_flag(dup | retain | atom(), message()) -> message()). +unset_flag(dup, Msg = #message{dup = true}) -> + Msg#message{dup = false}; +unset_flag(retain, Msg = #message{retain = true}) -> + Msg#message{retain = false}; unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg. %% @doc Format MQTT Message -format(#mqtt_message{id = MsgId, packet_id = PktId, from = {ClientId, Username}, - qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> +format(#message{id = MsgId, packet_id = PktId, from = {ClientId, Username}, + qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)", [i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]); %% TODO:... -format(#mqtt_message{id = MsgId, packet_id = PktId, from = From, - qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> +format(#message{id = MsgId, packet_id = PktId, from = From, + qos = Qos, retain = Retain, dup = Dup, topic =Topic}) -> io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)", [i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]). diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index f74537218..cf41669ed 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -29,7 +29,7 @@ -record(state, {tick}). --define(TAB, ?MODULE). +-define(TAB, metrics). -define(SERVER, ?MODULE). @@ -117,9 +117,10 @@ key(counter, Metric) -> init([]) -> emqx_time:seed(), % Create metrics table - ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]), + _ = ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]), % Tick to publish metrics - {ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}. + {ok, TRef} = timer:send_after(emqx_sys:sys_interval(), tick), + {ok, #state{tick = TRef}, hibernate}. handle_call(_Req, _From, State) -> {reply, error, State}. @@ -136,7 +137,8 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{tick = TRef}) -> - emqx_broker:stop_tick(TRef). + %%TODO: + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -145,6 +147,8 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%-------------------------------------------------------------------- +%% TODO: the depencies are not right + publish(Metric, Val) -> Msg = emqx_message:make(metrics, metric_topic(Metric), bin(Val)), emqx_broker:publish(emqx_message:set_flag(sys, Msg)). diff --git a/src/emqx_mod_rewrite.erl b/src/emqx_mod_rewrite.erl index 87e70021e..9a344aed5 100644 --- a/src/emqx_mod_rewrite.erl +++ b/src/emqx_mod_rewrite.erl @@ -42,7 +42,7 @@ rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) -> lager:info("Rewrite unsubscribe: ~p", [TopicTable]), {ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}. -rewrite_publish(Message=#mqtt_message{topic = Topic}, Rules) -> +rewrite_publish(Message = #message{topic = Topic}, Rules) -> %%TODO: this will not work if the client is always online. RewriteTopic = case get({rewrite, Topic}) of @@ -52,7 +52,7 @@ rewrite_publish(Message=#mqtt_message{topic = Topic}, Rules) -> DestTopic -> DestTopic end, - {ok, Message#mqtt_message{topic = RewriteTopic}}. + {ok, Message#message{topic = RewriteTopic}}. unload(_) -> emqx:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4), diff --git a/src/emqx_mqtt.erl b/src/emqx_mqtt.erl new file mode 100644 index 000000000..8bb1a53bd --- /dev/null +++ b/src/emqx_mqtt.erl @@ -0,0 +1,115 @@ +%%-------------------------------------------------------------------- +%% Copyright © 2013-2018 EMQ Inc. All rights reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt). + +-include("emqx_mqtt.hrl"). + +-export([bootstrap/0, shutdown/0]). + +-export([start_listeners/0, start_listener/1]). +-export([stop_listeners/0, stop_listener/1]). +-export([restart_listeners/0, restart_listener/1]). +-export([listeners/0]). + +-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}). + +bootstrap() -> + start_listeners(). + +shutdown() -> + stop_listeners(). + +%%-------------------------------------------------------------------- +%% Start/Stop Listeners +%%-------------------------------------------------------------------- + +%% @doc Start Listeners. +-spec(start_listeners() -> ok). +start_listeners() -> + lists:foreach(fun start_listener/1, emqx:env(listeners, [])). + +%% Start mqtt listener +-spec(start_listener(listener()) -> {ok, pid()} | {error, any()}). +start_listener({tcp, ListenOn, Opts}) -> + start_listener('mqtt:tcp', ListenOn, Opts); + +%% Start mqtt(SSL) listener +start_listener({ssl, ListenOn, Opts}) -> + start_listener('mqtt:ssl', ListenOn, Opts); + +%% Start http listener +start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws -> + {ok, _} = mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqx_ws, handle_request, []}); + +%% Start https listener +start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss -> + {ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}). + +start_listener(Proto, ListenOn, Opts) -> + Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), + MFArgs = {emqx_connection, start_link, [Env]}, + {ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs). + +listeners() -> + [Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)]. + +is_mqtt('mqtt:tcp') -> true; +is_mqtt('mqtt:ssl') -> true; +is_mqtt('mqtt:ws') -> true; +is_mqtt('mqtt:wss') -> true; +is_mqtt(_Proto) -> false. + +%% @doc Stop Listeners +-spec(stop_listeners() -> ok). +stop_listeners() -> lists:foreach(fun stop_listener/1, emqx:env(listeners, [])). + +-spec(stop_listener(listener()) -> ok | {error, any()}). +stop_listener({tcp, ListenOn, _Opts}) -> + esockd:close('mqtt:tcp', ListenOn); +stop_listener({ssl, ListenOn, _Opts}) -> + esockd:close('mqtt:ssl', ListenOn); +stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> + mochiweb:stop_http('mqtt:ws', ListenOn); +stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> + mochiweb:stop_http('mqtt:wss', ListenOn); +% stop_listener({Proto, ListenOn, _Opts}) when Proto == api -> +% mochiweb:stop_http('mqtt:api', ListenOn); +stop_listener({Proto, ListenOn, _Opts}) -> + esockd:close(Proto, ListenOn). + +%% @doc Restart Listeners +-spec(restart_listeners() -> ok). +restart_listeners() -> lists:foreach(fun restart_listener/1, emqx:env(listeners, [])). + +-spec(restart_listener(listener()) -> any()). +restart_listener({tcp, ListenOn, _Opts}) -> + esockd:reopen('mqtt:tcp', ListenOn); +restart_listener({ssl, ListenOn, _Opts}) -> + esockd:reopen('mqtt:ssl', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> + mochiweb:restart_http('mqtt:ws', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> + mochiweb:restart_http('mqtt:wss', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) when Proto == api -> + mochiweb:restart_http('mqtt:api', ListenOn); +restart_listener({Proto, ListenOn, _Opts}) -> + esockd:reopen(Proto, ListenOn). + +merge_sockopts(Options) -> + SockOpts = emqx_misc:merge_opts( + ?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])), + emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]). diff --git a/src/emqx_mqtt_app.erl b/src/emqx_mqtt_app.erl deleted file mode 100644 index 47e1eaba0..000000000 --- a/src/emqx_mqtt_app.erl +++ /dev/null @@ -1,29 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright © 2013-2018 EMQ Inc. All rights reserved. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_mqtt_app). - --behaviour(application). - --export([start/2, stop/1]). - -start(_Type, _Args) -> - emqx_mqtt_metrics:init(), - emqx_mqtt_sup:start_link(). - -stop(_State) -> - ok. - diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index eb334ed22..acdb5ee8c 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -154,8 +154,8 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped end} | [{max_len, MaxLen}, {dropped, Dropped}]]. %% @doc Enqueue a message. --spec(in(mqtt_message(), mqueue()) -> mqueue()). -in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> +-spec(in(message(), mqueue()) -> mqueue()). +in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; @@ -166,7 +166,7 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}); -in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, +in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, max_len = 0}) -> case lists:keysearch(Topic, 1, Priorities) of @@ -176,7 +176,7 @@ in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, {Pri, MQ1} = insert_p(Topic, 0, MQ), MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} end; -in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, +in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, priorities = Priorities, max_len = MaxLen}) -> case lists:keysearch(Topic, 1, Priorities) of diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7f58dee51..b61040443 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -120,7 +120,7 @@ client(#proto_state{client_id = ClientId, connected_at = Time}) -> WillTopic = if WillMsg =:= undefined -> undefined; - true -> WillMsg#mqtt_message.topic + true -> WillMsg#message.topic end, #mqtt_client{client_id = ClientId, client_pid = ClientPid, @@ -352,18 +352,18 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId), ?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State) end. --spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). +-spec(send(message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}). send(Msg, State = #proto_state{client_id = ClientId, username = Username, mountpoint = MountPoint, is_bridge = IsBridge}) - when is_record(Msg, mqtt_message) -> + when is_record(Msg, message) -> emqx_hooks:run('message.delivered', [ClientId, Username], Msg), send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State); send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) -> trace(send, Packet, State), - emqx_metrics:sent(Packet), + emqx_mqtt_metrics:sent(Packet), SendFun(Packet), {ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}. @@ -439,7 +439,7 @@ maybe_set_clientid(State) -> send_willmsg(_Client, undefined) -> ignore; send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) -> - emqx_broker:publish(WillMsg#mqtt_message{from = {ClientId, Username}}). + emqx_broker:publish(WillMsg#message{from = {ClientId, Username}}). start_keepalive(0, _State) -> ignore; @@ -570,10 +570,10 @@ sp(false) -> 0. %% The retained flag should be propagated for bridge. %%-------------------------------------------------------------------- -clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) -> +clean_retain(false, Msg = #message{retain = true, headers = Headers}) -> case lists:member(retained, Headers) of true -> Msg; - false -> Msg#mqtt_message{retain = false} + false -> Msg#message{retain = false} end; clean_retain(_IsBridge, Msg) -> Msg. @@ -596,16 +596,16 @@ feed_var({<<"%u">>, Username}, MountPoint) -> mount(undefined, Any) -> Any; -mount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> - Msg#mqtt_message{topic = <>}; +mount(MountPoint, Msg = #message{topic = Topic}) -> + Msg#message{topic = <>}; mount(MountPoint, TopicTable) when is_list(TopicTable) -> [{<>, Opts} || {Topic, Opts} <- TopicTable]. unmount(undefined, Any) -> Any; -unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) -> +unmount(MountPoint, Msg = #message{topic = Topic}) -> case catch split_binary(Topic, byte_size(MountPoint)) of - {MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0}; + {MountPoint, Topic0} -> Msg#message{topic = Topic0}; _ -> Msg end. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 611e967be..1c18907bc 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -185,15 +185,15 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... %% @doc Publish Message -spec(publish(pid(), message()) -> ok | {error, term()}). -publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> +publish(_Session, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 Directly emqx_broker:publish(Msg), ok; -publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> +publish(_Session, Msg = #message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically emqx_broker:publish(Msg), ok; -publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> +publish(Session, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 to Session gen_server:call(Session, {publish, Msg}, ?TIMEOUT). @@ -313,7 +313,7 @@ binding(ClientPid) -> handle_pre_hibernate(State) -> {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. -handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, packet_id = PacketId}}, _From, +handle_call({publish, Msg = #message{qos = ?QOS_2, packet_id = PacketId}}, _From, State = #state{awaiting_rel = AwaitingRel, await_rel_timer = Timer, await_rel_timeout = Timeout}) -> @@ -510,12 +510,12 @@ handle_cast(Msg, State) -> {noreply, State}. %% Ignore Messages delivered by self -handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, +handle_info({dispatch, _Topic, #message{from = {ClientId, _}}}, State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> {noreply, State}; %% Dispatch Message -handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) -> +handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) -> {noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))}; %% Do nothing if the client has been disconnected. @@ -604,7 +604,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now, if Force orelse (Diff >= Interval) -> case {Type, Msg} of - {publish, Msg = #mqtt_message{packet_id = PacketId}} -> + {publish, Msg = #message{packet_id = PacketId}} -> redeliver(Msg, State), Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}), retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); @@ -631,7 +631,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs], +expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, TS) div 1000) of @@ -651,8 +651,8 @@ sortfun(inflight) -> fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end; sortfun(awaiting_rel) -> - fun({_, #mqtt_message{timestamp = Ts1}}, - {_, #mqtt_message{timestamp = Ts2}}) -> + fun({_, #message{timestamp = Ts1}}, + {_, #message{timestamp = Ts2}}) -> Ts1 < Ts2 end. @@ -677,17 +677,17 @@ dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #mqtt_message{qos = ?QOS0}, State) -> +dispatch(Msg = #message{qos = ?QOS0}, State) -> deliver(Msg, State), State; -dispatch(Msg = #mqtt_message{qos = QoS}, +dispatch(Msg = #message{qos = QoS}, State = #state{next_msg_id = MsgId, inflight = Inflight}) when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> case Inflight:is_full() of true -> enqueue_msg(Msg, State); false -> - Msg1 = Msg#mqtt_message{packet_id = MsgId}, + Msg1 = Msg#message{packet_id = MsgId}, deliver(Msg1, State), await(Msg1, next_msg_id(State)) end. @@ -700,8 +700,8 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) -> %% Deliver %%-------------------------------------------------------------------- -redeliver(Msg = #mqtt_message{qos = QoS}, State) -> - deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); +redeliver(Msg = #message{qos = QoS}, State) -> + deliver(Msg#message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State); redeliver({pubrel, PacketId}, #state{client_pid = Pid}) -> Pid ! {redeliver, {?PUBREL, PacketId}}. @@ -715,7 +715,7 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) -> %% Awaiting ACK for QoS1/QoS2 Messages %%-------------------------------------------------------------------- -await(Msg = #mqtt_message{packet_id = PacketId}, +await(Msg = #message{packet_id = PacketId}, State = #state{inflight = Inflight, retry_timer = RetryTimer, retry_interval = Interval}) -> @@ -780,13 +780,13 @@ dequeue2(State = #state{mqueue = Q}) -> %% Tune QoS %%-------------------------------------------------------------------- -tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS}, +tune_qos(Topic, Msg = #message{qos = PubQoS}, #state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) -> case maps:find(Topic, SubMap) of {ok, SubQoS} when UpgradeQoS andalso (SubQoS > PubQoS) -> - Msg#mqtt_message{qos = SubQoS}; + Msg#message{qos = SubQoS}; {ok, SubQoS} when (not UpgradeQoS) andalso (SubQoS < PubQoS) -> - Msg#mqtt_message{qos = SubQoS}; + Msg#message{qos = SubQoS}; {ok, _} -> Msg; error -> @@ -797,8 +797,8 @@ tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS}, %% Reset Dup %%-------------------------------------------------------------------- -reset_dup(Msg = #mqtt_message{dup = true}) -> - Msg#mqtt_message{dup = false}; +reset_dup(Msg = #message{dup = true}) -> + Msg#message{dup = false}; reset_dup(Msg) -> Msg. %%-------------------------------------------------------------------- diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index 219914997..d93c1b969 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -163,7 +163,8 @@ init([]) -> Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED, ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]), % Tick to publish stats - {ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}. + {ok, TRef} = timer:send_after(emqx_sys:sys_interval(), tick), + {ok, #state{tick = TRef}, hibernate}. handle_call(stop, _From, State) -> {stop, normal, ok, State}; @@ -194,7 +195,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{tick = TRef}) -> - emqx_broker:stop_tick(TRef). + timer:cancel(TRef). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index ff632d4e7..d06a8df79 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -54,16 +54,17 @@ stop_child(ChildId) -> %%-------------------------------------------------------------------- init([]) -> - {ok, {{one_for_all, 0, 1}, + {ok, {{one_for_all, 10, 3600}, [?CHILD(emqx_ctl, worker), ?CHILD(emqx_hooks, worker), ?CHILD(emqx_locker, worker), ?CHILD(emqx_stats, worker), ?CHILD(emqx_metrics, worker), + ?CHILD(emqx_sys, worker), ?CHILD(emqx_router_sup, supervisor), ?CHILD(emqx_broker_sup, supervisor), ?CHILD(emqx_pooler, supervisor), - ?CHILD(emqx_trace_sup, supervisor), + ?CHILD(emqx_tracer_sup, supervisor), ?CHILD(emqx_cm_sup, supervisor), ?CHILD(emqx_sm_sup, supervisor), ?CHILD(emqx_session_sup, supervisor), diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index bee19841a..fc632cb6a 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -140,7 +140,7 @@ handle_call(Req, _From, State) -> reply({error, unexpected_request}, State). handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) -> - emqx_metrics:received(Packet), + emqx_mqtt_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> {noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate}; @@ -178,7 +178,7 @@ handle_info({suback, PacketId, GrantedQos}, State) -> %% Fastlane handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State); + handle_info({deliver, Message#message{qos = ?QOS_0}}, State); handle_info({deliver, Message}, State) -> with_proto(