Add emqx_mqtt module

This commit is contained in:
Feng Lee 2018-03-23 18:13:09 +08:00
parent f007f69abe
commit 9976327c8d
16 changed files with 244 additions and 231 deletions

View File

@ -76,7 +76,6 @@
-type(message_from() :: #{zone := atom(),
node := atom(),
clientid := binary(),
protocol := protocol(),
connector => atom(),
peername => {inet:ip_address(), inet:port_number()},
username => binary(),
@ -99,8 +98,14 @@
{ id :: message_id(), %% Global unique id
from :: message_from(), %% Message from
sender :: pid(), %% The pid of the sender/publisher
flags :: message_flags(), %% Message flags
packet_id,
dup :: boolean(), %% Dup flag
qos :: 0 | 1 | 2, %% QoS
sys :: boolean(), %% $SYS flag
retain :: boolean(), %% Retain flag
flags = [], %% :: message_flags(), %% Message flags
headers :: message_headers(), %% Message headers
protocol :: protocol(),
topic :: binary(), %% Message topic
properties :: map(), %% Message user properties
payload :: binary(), %% Message payload

View File

@ -18,16 +18,9 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
%% Start/Stop Application
-export([start/0, env/1, env/2, is_running/1, stop/0]).
%% Start/Stop Listeners
-export([start_listeners/0, start_listener/1, listeners/0,
stop_listeners/0, stop_listener/1,
restart_listeners/0, restart_listener/1]).
%% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3, publish/1,
unsubscribe/1, unsubscribe/2]).
@ -47,8 +40,6 @@
%% Shutdown and reboot
-export([shutdown/0, shutdown/1, reboot/0]).
-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
-define(APP, ?MODULE).
%%--------------------------------------------------------------------
@ -80,85 +71,6 @@ is_running(Node) ->
Pid when is_pid(Pid) -> true
end.
%%--------------------------------------------------------------------
%% Start/Stop Listeners
%%--------------------------------------------------------------------
%% @doc Start Listeners.
-spec(start_listeners() -> ok).
start_listeners() -> lists:foreach(fun start_listener/1, env(listeners, [])).
%% Start mqtt listener
-spec(start_listener(listener()) -> {ok, pid()} | {error, any()}).
start_listener({tcp, ListenOn, Opts}) ->
start_listener('mqtt:tcp', ListenOn, Opts);
%% Start mqtt(SSL) listener
start_listener({ssl, ListenOn, Opts}) ->
start_listener('mqtt:ssl', ListenOn, Opts);
%% Start http listener
start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
{ok, _} = mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqx_ws, handle_request, []});
%% Start https listener
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
{ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}).
start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
MFArgs = {emqx_connection, start_link, [Env]},
{ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
listeners() ->
[Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)].
is_mqtt('mqtt:tcp') -> true;
is_mqtt('mqtt:ssl') -> true;
is_mqtt('mqtt:ws') -> true;
is_mqtt('mqtt:wss') -> true;
is_mqtt(_Proto) -> false.
%% @doc Stop Listeners
-spec(stop_listeners() -> ok).
stop_listeners() -> lists:foreach(fun stop_listener/1, env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, any()}).
stop_listener({tcp, ListenOn, _Opts}) ->
esockd:close('mqtt:tcp', ListenOn);
stop_listener({ssl, ListenOn, _Opts}) ->
esockd:close('mqtt:ssl', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:stop_http('mqtt:wss', ListenOn);
% stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
% mochiweb:stop_http('mqtt:api', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn).
%% @doc Restart Listeners
-spec(restart_listeners() -> ok).
restart_listeners() -> lists:foreach(fun restart_listener/1, env(listeners, [])).
-spec(restart_listener(listener()) -> any()).
restart_listener({tcp, ListenOn, _Opts}) ->
esockd:reopen('mqtt:tcp', ListenOn);
restart_listener({ssl, ListenOn, _Opts}) ->
esockd:reopen('mqtt:ssl', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:restart_http('mqtt:ws', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:restart_http('mqtt:wss', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == api ->
mochiweb:restart_http('mqtt:api', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) ->
esockd:reopen(Proto, ListenOn).
merge_sockopts(Options) ->
SockOpts = emqx_misc:merge_opts(
?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])),
emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]).
%%--------------------------------------------------------------------
%% PubSub API

View File

@ -26,13 +26,15 @@
-define(APP, emqx).
%%--------------------------------------------------------------------
%% Application Callbacks
%% Application callbacks
%%--------------------------------------------------------------------
start(_Type, _Args) ->
print_banner(),
ekka:start(),
{ok, Sup} = emqx_sup:start_link(),
%%TODO: fixme later
emqx_mqtt_metrics:init(),
ok = register_acl_mod(),
emqx_modules:load(),
start_autocluster(),
@ -43,7 +45,7 @@ start(_Type, _Args) ->
-spec(stop(State :: term()) -> term()).
stop(_State) ->
emqx_modules:unload(),
catch emqx:stop_listeners().
catch emqx_mqtt:shutdown().
%%--------------------------------------------------------------------
%% Print Banner
@ -78,5 +80,5 @@ start_autocluster() ->
after_autocluster() ->
emqx_plugins:init(),
emqx_plugins:load(),
emqx:start_listeners().
emqx_mqtt:bootstrap().

View File

@ -167,11 +167,11 @@ dequeue(State = #state{mqueue = MQ}) ->
{empty, MQ1} ->
State#state{mqueue = MQ1};
{{value, Msg}, MQ1} ->
handle_info({dispatch, Msg#mqtt_message.topic, Msg}, State),
handle_info({dispatch, Msg#message.topic, Msg}, State),
dequeue(State#state{mqueue = MQ1})
end.
transform(Msg = #mqtt_message{topic = Topic}, #state{topic_prefix = Prefix,
transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix,
topic_suffix = Suffix}) ->
Msg#mqtt_message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
Msg#message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

View File

@ -203,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
%% Fastlane
handle_info({dispatch, _Topic, Message}, State) ->
handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State);
handle_info({deliver, Message#message{qos = ?QOS_0}}, State);
handle_info({deliver, Message}, State) ->
with_proto(
@ -316,7 +316,7 @@ received(Bytes, State = #state{parser = Parser,
{more, NewParser} ->
{noreply, run_socket(State#state{parser = NewParser}), IdleTimeout};
{ok, Packet, Rest} ->
emqx_metrics:received(Packet),
emqx_mqtt_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
received(Rest, State#state{parser = emqx_parser:initial_state(PacketSize),

View File

@ -30,13 +30,13 @@
-type(msg_from() :: atom() | {binary(), undefined | binary()}).
%% @doc Make a message
-spec(make(msg_from(), binary(), binary()) -> mqtt_message()).
-spec(make(msg_from(), binary(), binary()) -> message()).
make(From, Topic, Payload) ->
make(From, ?QOS_0, Topic, Payload).
-spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> mqtt_message()).
-spec(make(msg_from(), mqtt_qos(), binary(), binary()) -> message()).
make(From, Qos, Topic, Payload) ->
#mqtt_message{id = msgid(),
#message{id = msgid(),
from = From,
qos = ?QOS_I(Qos),
topic = Topic,
@ -44,7 +44,7 @@ make(From, Qos, Topic, Payload) ->
timestamp = os:timestamp()}.
%% @doc Message from Packet
-spec(from_packet(mqtt_packet()) -> mqtt_message()).
-spec(from_packet(mqtt_packet()) -> message()).
from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = Qos,
@ -52,7 +52,7 @@ from_packet(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
variable = #mqtt_packet_publish{topic_name = Topic,
packet_id = PacketId},
payload = Payload}) ->
#mqtt_message{id = msgid(),
#message{id = msgid(),
packet_id = PacketId,
qos = Qos,
retain = Retain,
@ -70,7 +70,7 @@ from_packet(#mqtt_packet_connect{client_id = ClientId,
will_qos = Qos,
will_topic = Topic,
will_msg = Msg}) ->
#mqtt_message{id = msgid(),
#message{id = msgid(),
topic = Topic,
from = {ClientId, Username},
retain = Retain,
@ -81,17 +81,17 @@ from_packet(#mqtt_packet_connect{client_id = ClientId,
from_packet(ClientId, Packet) ->
Msg = from_packet(Packet),
Msg#mqtt_message{from = ClientId}.
Msg#message{from = ClientId}.
from_packet(Username, ClientId, Packet) ->
Msg = from_packet(Packet),
Msg#mqtt_message{from = {ClientId, Username}}.
Msg#message{from = {ClientId, Username}}.
msgid() -> emqx_guid:gen().
%% @doc Message to Packet
-spec(to_packet(mqtt_message()) -> mqtt_packet()).
to_packet(#mqtt_message{packet_id = PkgId,
-spec(to_packet(message()) -> mqtt_packet()).
to_packet(#message{packet_id = PkgId,
qos = Qos,
retain = Retain,
dup = Dup,
@ -111,39 +111,41 @@ to_packet(#mqtt_message{packet_id = PkgId,
payload = Payload}.
%% @doc set dup, retain flag
-spec(set_flag(mqtt_message()) -> mqtt_message()).
-spec(set_flag(message()) -> message()).
set_flag(Msg) ->
Msg#mqtt_message{dup = true, retain = true}.
Msg#message{dup = true, retain = true}.
-spec(set_flag(atom(), mqtt_message()) -> mqtt_message()).
set_flag(dup, Msg = #mqtt_message{dup = false}) ->
Msg#mqtt_message{dup = true};
set_flag(sys, Msg = #mqtt_message{sys = false}) ->
Msg#mqtt_message{sys = true};
set_flag(retain, Msg = #mqtt_message{retain = false}) ->
Msg#mqtt_message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
-spec(set_flag(atom(), message()) -> message()).
set_flag(dup, Msg = #message{dup = false}) ->
Msg#message{dup = true};
set_flag(sys, Msg = #message{sys = false}) ->
Msg#message{sys = true};
set_flag(retain, Msg = #message{retain = false}) ->
Msg#message{retain = true};
set_flag(Flag, Msg) when Flag =:= dup;
Flag =:= retain;
Flag =:= sys -> Msg.
%% @doc Unset dup, retain flag
-spec(unset_flag(mqtt_message()) -> mqtt_message()).
-spec(unset_flag(message()) -> message()).
unset_flag(Msg) ->
Msg#mqtt_message{dup = false, retain = false}.
Msg#message{dup = false, retain = false}.
-spec(unset_flag(dup | retain | atom(), mqtt_message()) -> mqtt_message()).
unset_flag(dup, Msg = #mqtt_message{dup = true}) ->
Msg#mqtt_message{dup = false};
unset_flag(retain, Msg = #mqtt_message{retain = true}) ->
Msg#mqtt_message{retain = false};
-spec(unset_flag(dup | retain | atom(), message()) -> message()).
unset_flag(dup, Msg = #message{dup = true}) ->
Msg#message{dup = false};
unset_flag(retain, Msg = #message{retain = true}) ->
Msg#message{retain = false};
unset_flag(Flag, Msg) when Flag =:= dup orelse Flag =:= retain -> Msg.
%% @doc Format MQTT Message
format(#mqtt_message{id = MsgId, packet_id = PktId, from = {ClientId, Username},
format(#message{id = MsgId, packet_id = PktId, from = {ClientId, Username},
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s/~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, Username, ClientId, Topic]);
%% TODO:...
format(#mqtt_message{id = MsgId, packet_id = PktId, from = From,
format(#message{id = MsgId, packet_id = PktId, from = From,
qos = Qos, retain = Retain, dup = Dup, topic =Topic}) ->
io_lib:format("Message(Q~p, R~p, D~p, MsgId=~p, PktId=~p, From=~s, Topic=~s)",
[i(Qos), i(Retain), i(Dup), MsgId, PktId, From, Topic]).

View File

@ -29,7 +29,7 @@
-record(state, {tick}).
-define(TAB, ?MODULE).
-define(TAB, metrics).
-define(SERVER, ?MODULE).
@ -117,9 +117,10 @@ key(counter, Metric) ->
init([]) ->
emqx_time:seed(),
% Create metrics table
ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]),
_ = ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]),
% Tick to publish metrics
{ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}.
{ok, TRef} = timer:send_after(emqx_sys:sys_interval(), tick),
{ok, #state{tick = TRef}, hibernate}.
handle_call(_Req, _From, State) ->
{reply, error, State}.
@ -136,7 +137,8 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{tick = TRef}) ->
emqx_broker:stop_tick(TRef).
%%TODO:
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -145,6 +147,8 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
%% TODO: the depencies are not right
publish(Metric, Val) ->
Msg = emqx_message:make(metrics, metric_topic(Metric), bin(Val)),
emqx_broker:publish(emqx_message:set_flag(sys, Msg)).

View File

@ -42,7 +42,7 @@ rewrite_unsubscribe(_ClientId, _Username, TopicTable, Rules) ->
lager:info("Rewrite unsubscribe: ~p", [TopicTable]),
{ok, [{match_rule(Topic, Rules), Opts} || {Topic, Opts} <- TopicTable]}.
rewrite_publish(Message=#mqtt_message{topic = Topic}, Rules) ->
rewrite_publish(Message = #message{topic = Topic}, Rules) ->
%%TODO: this will not work if the client is always online.
RewriteTopic =
case get({rewrite, Topic}) of
@ -52,7 +52,7 @@ rewrite_publish(Message=#mqtt_message{topic = Topic}, Rules) ->
DestTopic ->
DestTopic
end,
{ok, Message#mqtt_message{topic = RewriteTopic}}.
{ok, Message#message{topic = RewriteTopic}}.
unload(_) ->
emqx:unhook('client.subscribe', fun ?MODULE:rewrite_subscribe/4),

115
src/emqx_mqtt.erl Normal file
View File

@ -0,0 +1,115 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt).
-include("emqx_mqtt.hrl").
-export([bootstrap/0, shutdown/0]).
-export([start_listeners/0, start_listener/1]).
-export([stop_listeners/0, stop_listener/1]).
-export([restart_listeners/0, restart_listener/1]).
-export([listeners/0]).
-type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
bootstrap() ->
start_listeners().
shutdown() ->
stop_listeners().
%%--------------------------------------------------------------------
%% Start/Stop Listeners
%%--------------------------------------------------------------------
%% @doc Start Listeners.
-spec(start_listeners() -> ok).
start_listeners() ->
lists:foreach(fun start_listener/1, emqx:env(listeners, [])).
%% Start mqtt listener
-spec(start_listener(listener()) -> {ok, pid()} | {error, any()}).
start_listener({tcp, ListenOn, Opts}) ->
start_listener('mqtt:tcp', ListenOn, Opts);
%% Start mqtt(SSL) listener
start_listener({ssl, ListenOn, Opts}) ->
start_listener('mqtt:ssl', ListenOn, Opts);
%% Start http listener
start_listener({Proto, ListenOn, Opts}) when Proto == http; Proto == ws ->
{ok, _} = mochiweb:start_http('mqtt:ws', ListenOn, Opts, {emqx_ws, handle_request, []});
%% Start https listener
start_listener({Proto, ListenOn, Opts}) when Proto == https; Proto == wss ->
{ok, _} = mochiweb:start_http('mqtt:wss', ListenOn, Opts, {emqx_ws, handle_request, []}).
start_listener(Proto, ListenOn, Opts) ->
Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])),
MFArgs = {emqx_connection, start_link, [Env]},
{ok, _} = esockd:open(Proto, ListenOn, merge_sockopts(Opts), MFArgs).
listeners() ->
[Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)].
is_mqtt('mqtt:tcp') -> true;
is_mqtt('mqtt:ssl') -> true;
is_mqtt('mqtt:ws') -> true;
is_mqtt('mqtt:wss') -> true;
is_mqtt(_Proto) -> false.
%% @doc Stop Listeners
-spec(stop_listeners() -> ok).
stop_listeners() -> lists:foreach(fun stop_listener/1, emqx:env(listeners, [])).
-spec(stop_listener(listener()) -> ok | {error, any()}).
stop_listener({tcp, ListenOn, _Opts}) ->
esockd:close('mqtt:tcp', ListenOn);
stop_listener({ssl, ListenOn, _Opts}) ->
esockd:close('mqtt:ssl', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:stop_http('mqtt:wss', ListenOn);
% stop_listener({Proto, ListenOn, _Opts}) when Proto == api ->
% mochiweb:stop_http('mqtt:api', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn).
%% @doc Restart Listeners
-spec(restart_listeners() -> ok).
restart_listeners() -> lists:foreach(fun restart_listener/1, emqx:env(listeners, [])).
-spec(restart_listener(listener()) -> any()).
restart_listener({tcp, ListenOn, _Opts}) ->
esockd:reopen('mqtt:tcp', ListenOn);
restart_listener({ssl, ListenOn, _Opts}) ->
esockd:reopen('mqtt:ssl', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:restart_http('mqtt:ws', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
mochiweb:restart_http('mqtt:wss', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == api ->
mochiweb:restart_http('mqtt:api', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) ->
esockd:reopen(Proto, ListenOn).
merge_sockopts(Options) ->
SockOpts = emqx_misc:merge_opts(
?MQTT_SOCKOPTS, proplists:get_value(sockopts, Options, [])),
emqx_misc:merge_opts(Options, [{sockopts, SockOpts}]).

View File

@ -1,29 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright © 2013-2018 EMQ Inc. All rights reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_Type, _Args) ->
emqx_mqtt_metrics:init(),
emqx_mqtt_sup:start_link().
stop(_State) ->
ok.

View File

@ -154,8 +154,8 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
%% @doc Enqueue a message.
-spec(in(mqtt_message(), mqueue()) -> mqueue()).
in(#mqtt_message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
-spec(in(message(), mqueue()) -> mqueue()).
in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
MQ;
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
@ -166,7 +166,7 @@ in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
maybe_set_alarm(MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1});
in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = 0}) ->
case lists:keysearch(Topic, 1, Priorities) of
@ -176,7 +176,7 @@ in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
{Pri, MQ1} = insert_p(Topic, 0, MQ),
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end;
in(Msg = #mqtt_message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = MaxLen}) ->
case lists:keysearch(Topic, 1, Priorities) of

View File

@ -120,7 +120,7 @@ client(#proto_state{client_id = ClientId,
connected_at = Time}) ->
WillTopic = if
WillMsg =:= undefined -> undefined;
true -> WillMsg#mqtt_message.topic
true -> WillMsg#message.topic
end,
#mqtt_client{client_id = ClientId,
client_pid = ClientPid,
@ -352,18 +352,18 @@ with_puback(Type, Packet = ?PUBLISH_PACKET(_Qos, PacketId),
?LOG(error, "PUBLISH ~p error: ~p", [PacketId, Error], State)
end.
-spec(send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
-spec(send(message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}).
send(Msg, State = #proto_state{client_id = ClientId,
username = Username,
mountpoint = MountPoint,
is_bridge = IsBridge})
when is_record(Msg, mqtt_message) ->
when is_record(Msg, message) ->
emqx_hooks:run('message.delivered', [ClientId, Username], Msg),
send(emqx_message:to_packet(unmount(MountPoint, clean_retain(IsBridge, Msg))), State);
send(Packet = ?PACKET(Type), State = #proto_state{sendfun = SendFun, stats_data = Stats}) ->
trace(send, Packet, State),
emqx_metrics:sent(Packet),
emqx_mqtt_metrics:sent(Packet),
SendFun(Packet),
{ok, State#proto_state{stats_data = inc_stats(send, Type, Stats)}}.
@ -439,7 +439,7 @@ maybe_set_clientid(State) ->
send_willmsg(_Client, undefined) ->
ignore;
send_willmsg(#mqtt_client{client_id = ClientId, username = Username}, WillMsg) ->
emqx_broker:publish(WillMsg#mqtt_message{from = {ClientId, Username}}).
emqx_broker:publish(WillMsg#message{from = {ClientId, Username}}).
start_keepalive(0, _State) -> ignore;
@ -570,10 +570,10 @@ sp(false) -> 0.
%% The retained flag should be propagated for bridge.
%%--------------------------------------------------------------------
clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) ->
clean_retain(false, Msg = #message{retain = true, headers = Headers}) ->
case lists:member(retained, Headers) of
true -> Msg;
false -> Msg#mqtt_message{retain = false}
false -> Msg#message{retain = false}
end;
clean_retain(_IsBridge, Msg) ->
Msg.
@ -596,16 +596,16 @@ feed_var({<<"%u">>, Username}, MountPoint) ->
mount(undefined, Any) ->
Any;
mount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
Msg#mqtt_message{topic = <<MountPoint/binary, Topic/binary>>};
mount(MountPoint, Msg = #message{topic = Topic}) ->
Msg#message{topic = <<MountPoint/binary, Topic/binary>>};
mount(MountPoint, TopicTable) when is_list(TopicTable) ->
[{<<MountPoint/binary, Topic/binary>>, Opts} || {Topic, Opts} <- TopicTable].
unmount(undefined, Any) ->
Any;
unmount(MountPoint, Msg = #mqtt_message{topic = Topic}) ->
unmount(MountPoint, Msg = #message{topic = Topic}) ->
case catch split_binary(Topic, byte_size(MountPoint)) of
{MountPoint, Topic0} -> Msg#mqtt_message{topic = Topic0};
{MountPoint, Topic0} -> Msg#message{topic = Topic0};
_ -> Msg
end.

View File

@ -185,15 +185,15 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??...
%% @doc Publish Message
-spec(publish(pid(), message()) -> ok | {error, term()}).
publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) ->
publish(_Session, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly
emqx_broker:publish(Msg), ok;
publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) ->
publish(_Session, Msg = #message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically
emqx_broker:publish(Msg), ok;
publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) ->
publish(Session, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session
gen_server:call(Session, {publish, Msg}, ?TIMEOUT).
@ -313,7 +313,7 @@ binding(ClientPid) ->
handle_pre_hibernate(State) ->
{hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}.
handle_call({publish, Msg = #mqtt_message{qos = ?QOS_2, packet_id = PacketId}}, _From,
handle_call({publish, Msg = #message{qos = ?QOS_2, packet_id = PacketId}}, _From,
State = #state{awaiting_rel = AwaitingRel,
await_rel_timer = Timer,
await_rel_timeout = Timeout}) ->
@ -510,12 +510,12 @@ handle_cast(Msg, State) ->
{noreply, State}.
%% Ignore Messages delivered by self
handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}},
handle_info({dispatch, _Topic, #message{from = {ClientId, _}}},
State = #state{client_id = ClientId, ignore_loop_deliver = true}) ->
{noreply, State};
%% Dispatch Message
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, mqtt_message) ->
handle_info({dispatch, Topic, Msg}, State) when is_record(Msg, message) ->
{noreply, gc(dispatch(tune_qos(Topic, reset_dup(Msg), State), State))};
%% Do nothing if the client has been disconnected.
@ -604,7 +604,7 @@ retry_delivery(Force, [{Type, Msg, Ts} | Msgs], Now,
if
Force orelse (Diff >= Interval) ->
case {Type, Msg} of
{publish, Msg = #mqtt_message{packet_id = PacketId}} ->
{publish, Msg = #message{packet_id = PacketId}} ->
redeliver(Msg, State),
Inflight1 = Inflight:update(PacketId, {publish, Msg, Now}),
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
@ -631,7 +631,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
expire_awaiting_rel([], _Now, State) ->
State#state{await_rel_timer = undefined};
expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs],
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs],
Now, State = #state{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, TS) div 1000) of
@ -651,8 +651,8 @@ sortfun(inflight) ->
fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end;
sortfun(awaiting_rel) ->
fun({_, #mqtt_message{timestamp = Ts1}},
{_, #mqtt_message{timestamp = Ts2}}) ->
fun({_, #message{timestamp = Ts1}},
{_, #message{timestamp = Ts2}}) ->
Ts1 < Ts2
end.
@ -677,17 +677,17 @@ dispatch(Msg, State = #state{client_id = ClientId, client_pid = undefined}) ->
end;
%% Deliver qos0 message directly to client
dispatch(Msg = #mqtt_message{qos = ?QOS0}, State) ->
dispatch(Msg = #message{qos = ?QOS0}, State) ->
deliver(Msg, State), State;
dispatch(Msg = #mqtt_message{qos = QoS},
dispatch(Msg = #message{qos = QoS},
State = #state{next_msg_id = MsgId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case Inflight:is_full() of
true ->
enqueue_msg(Msg, State);
false ->
Msg1 = Msg#mqtt_message{packet_id = MsgId},
Msg1 = Msg#message{packet_id = MsgId},
deliver(Msg1, State),
await(Msg1, next_msg_id(State))
end.
@ -700,8 +700,8 @@ enqueue_msg(Msg, State = #state{mqueue = Q}) ->
%% Deliver
%%--------------------------------------------------------------------
redeliver(Msg = #mqtt_message{qos = QoS}, State) ->
deliver(Msg#mqtt_message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State);
redeliver(Msg = #message{qos = QoS}, State) ->
deliver(Msg#message{dup = if QoS =:= ?QOS2 -> false; true -> true end}, State);
redeliver({pubrel, PacketId}, #state{client_pid = Pid}) ->
Pid ! {redeliver, {?PUBREL, PacketId}}.
@ -715,7 +715,7 @@ deliver(Msg, #state{client_pid = Pid, binding = remote}) ->
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
await(Msg = #mqtt_message{packet_id = PacketId},
await(Msg = #message{packet_id = PacketId},
State = #state{inflight = Inflight,
retry_timer = RetryTimer,
retry_interval = Interval}) ->
@ -780,13 +780,13 @@ dequeue2(State = #state{mqueue = Q}) ->
%% Tune QoS
%%--------------------------------------------------------------------
tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS},
tune_qos(Topic, Msg = #message{qos = PubQoS},
#state{subscriptions = SubMap, upgrade_qos = UpgradeQoS}) ->
case maps:find(Topic, SubMap) of
{ok, SubQoS} when UpgradeQoS andalso (SubQoS > PubQoS) ->
Msg#mqtt_message{qos = SubQoS};
Msg#message{qos = SubQoS};
{ok, SubQoS} when (not UpgradeQoS) andalso (SubQoS < PubQoS) ->
Msg#mqtt_message{qos = SubQoS};
Msg#message{qos = SubQoS};
{ok, _} ->
Msg;
error ->
@ -797,8 +797,8 @@ tune_qos(Topic, Msg = #mqtt_message{qos = PubQoS},
%% Reset Dup
%%--------------------------------------------------------------------
reset_dup(Msg = #mqtt_message{dup = true}) ->
Msg#mqtt_message{dup = false};
reset_dup(Msg = #message{dup = true}) ->
Msg#message{dup = false};
reset_dup(Msg) -> Msg.
%%--------------------------------------------------------------------

View File

@ -163,7 +163,8 @@ init([]) ->
Topics = ?SYSTOP_CLIENTS ++ ?SYSTOP_SESSIONS ++ ?SYSTOP_PUBSUB ++ ?SYSTOP_RETAINED,
ets:insert(?STATS_TAB, [{Topic, 0} || Topic <- Topics]),
% Tick to publish stats
{ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}.
{ok, TRef} = timer:send_after(emqx_sys:sys_interval(), tick),
{ok, #state{tick = TRef}, hibernate}.
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
@ -194,7 +195,7 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{tick = TRef}) ->
emqx_broker:stop_tick(TRef).
timer:cancel(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -54,16 +54,17 @@ stop_child(ChildId) ->
%%--------------------------------------------------------------------
init([]) ->
{ok, {{one_for_all, 0, 1},
{ok, {{one_for_all, 10, 3600},
[?CHILD(emqx_ctl, worker),
?CHILD(emqx_hooks, worker),
?CHILD(emqx_locker, worker),
?CHILD(emqx_stats, worker),
?CHILD(emqx_metrics, worker),
?CHILD(emqx_sys, worker),
?CHILD(emqx_router_sup, supervisor),
?CHILD(emqx_broker_sup, supervisor),
?CHILD(emqx_pooler, supervisor),
?CHILD(emqx_trace_sup, supervisor),
?CHILD(emqx_tracer_sup, supervisor),
?CHILD(emqx_cm_sup, supervisor),
?CHILD(emqx_sm_sup, supervisor),
?CHILD(emqx_session_sup, supervisor),

View File

@ -140,7 +140,7 @@ handle_call(Req, _From, State) ->
reply({error, unexpected_request}, State).
handle_cast({received, Packet}, State = #wsclient_state{proto_state = ProtoState}) ->
emqx_metrics:received(Packet),
emqx_mqtt_metrics:received(Packet),
case emqx_protocol:received(Packet, ProtoState) of
{ok, ProtoState1} ->
{noreply, gc(State#wsclient_state{proto_state = ProtoState1}), hibernate};
@ -178,7 +178,7 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
%% Fastlane
handle_info({dispatch, _Topic, Message}, State) ->
handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State);
handle_info({deliver, Message#message{qos = ?QOS_0}}, State);
handle_info({deliver, Message}, State) ->
with_proto(