Parse and serialize MQTT 5.0 protocol packets

This commit is contained in:
Feng Lee 2018-02-27 23:45:55 +08:00
parent a8aeb5ac17
commit f7f0f27e4d
18 changed files with 921 additions and 303 deletions

View File

@ -164,26 +164,26 @@
-type(route() :: #route{}).
%%--------------------------------------------------------------------
%% MQTT Alarm
%% Alarm
%%--------------------------------------------------------------------
-record(mqtt_alarm,
-record(alarm,
{ id :: binary(),
severity :: warning | error | critical,
severity :: notice | warning | error | critical,
title :: iolist() | binary(),
summary :: iolist() | binary(),
timestamp :: erlang:timestamp()
}).
-type(mqtt_alarm() :: #mqtt_alarm{}).
-type(alarm() :: #alarm{}).
%%--------------------------------------------------------------------
%% MQTT Plugin
%% Plugin
%%--------------------------------------------------------------------
-record(mqtt_plugin, { name, version, descr, active = false }).
-record(plugin, { name, version, descr, active = false }).
-type(mqtt_plugin() :: #mqtt_plugin{}).
-type(plugin() :: #plugin{}).
%%--------------------------------------------------------------------
%% MQTT CLI Command. For example: 'broker metrics'

View File

@ -56,15 +56,15 @@ alarm_fun(Bool) ->
(clear, _AlarmId) when Bool =:= false -> alarm_fun(false)
end.
-spec(set_alarm(mqtt_alarm()) -> ok).
set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) ->
-spec(set_alarm(alarm()) -> ok).
set_alarm(Alarm) when is_record(Alarm, alarm) ->
gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}).
-spec(clear_alarm(any()) -> ok).
clear_alarm(AlarmId) when is_binary(AlarmId) ->
gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}).
-spec(get_alarms() -> list(mqtt_alarm())).
-spec(get_alarms() -> list(alarm())).
get_alarms() ->
gen_event:call(?ALARM_MGR, ?MODULE, get_alarms).
@ -83,10 +83,10 @@ delete_alarm_handler(Module) when is_atom(Module) ->
init(_) -> {ok, []}.
handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId,
severity = Severity,
title = Title,
summary = Summary}}, Alarms)->
handle_event({set_alarm, Alarm = #alarm{id = AlarmId,
severity = Severity,
title = Title,
summary = Summary}}, Alarms)->
TS = os:timestamp(),
Json = mochijson2:encode([{id, AlarmId},
{severity, Severity},
@ -94,7 +94,7 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId,
{summary, iolist_to_binary(Summary)},
{ts, emqx_time:now_secs(TS)}]),
emqx:publish(alarm_msg(alert, AlarmId, Json)),
{ok, [Alarm#mqtt_alarm{timestamp = TS} | Alarms]};
{ok, [Alarm#alarm{timestamp = TS} | Alarms]};
handle_event({clear_alarm, AlarmId}, Alarms) ->
Json = mochijson2:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]),

32
src/emqx_cli.erl Normal file
View File

@ -0,0 +1,32 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_cli).
-export([print/1, print/2, usage/1]).
print(Msg) ->
io:format(Msg).
print(Format, Args) ->
io:format(Format, Args).
usage(CmdList) ->
lists:foreach(
fun({Cmd, Descr}) ->
io:format("~-48s# ~s~n", [Cmd, Descr])
end, CmdList).

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,14 +18,6 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_cli.hrl").
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/0, register_cmd/2, register_cmd/3, unregister_cmd/1,
lookup/1, run/1]).
@ -36,7 +28,9 @@
-record(state, {seq = 0}).
-define(CMD_TAB, mqttd_ctl_cmd).
-define(SERVER, ?MODULE).
-define(TAB, ?MODULE).
%%--------------------------------------------------------------------
%% API
@ -87,40 +81,40 @@ run([CmdS|Args]) ->
%% @doc Lookup a command
-spec(lookup(atom()) -> [{module(), atom()}]).
lookup(Cmd) ->
case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of
case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of
[El] -> El;
[] -> []
end.
%% @doc Usage
usage() ->
?PRINT("Usage: ~s~n", [?MODULE]),
[begin ?PRINT("~80..-s~n", [""]), Mod:Cmd(usage) end
|| {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)].
io:format("Usage: ~s~n", [?MODULE]),
[begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end
|| {_, {Mod, Cmd}, _} <- ets:tab2list(?TAB)].
%%--------------------------------------------------------------------
%% gen_server callbacks
%%--------------------------------------------------------------------
init([]) ->
ets:new(?CMD_TAB, [ordered_set, named_table, protected]),
ets:new(?TAB, [ordered_set, named_table, protected]),
{ok, #state{seq = 0}}.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
[] ->
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] ->
lager:warning("CLI: ~s is overidden by ~p", [Cmd, MF]),
ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
noreply(next_seq(State));
handle_cast({unregister_cmd, Cmd}, State) ->
ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}),
ets:match_delete(?TAB, {{'_', Cmd}, '_', '_'}),
noreply(State);
handle_cast(_Msg, State) ->
@ -136,7 +130,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%--------------------------------------------------------------------
%% Internal Function Definitions
%% Internal Function
%%--------------------------------------------------------------------
noreply(State) ->
@ -159,9 +153,10 @@ register_cmd_test_() ->
ok = emqx_ctl:terminate(shutdown, State)
end,
fun(State = #state{seq = Seq}) ->
emqx_ctl:handle_cast({register_cmd, test0, {?MODULE, test0}, []}, State),
[?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?CMD_TAB, {Seq,test0}))]
emqx_ctl:handle_cast({register_cmd, test0, {?MODULE, test0}, []}, State),
[?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?TAB, {Seq,test0}))]
end
}.
-endif.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -18,19 +18,8 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-define(SERVER, ?MODULE).
%% API Function Exports
-export([start_link/0]).
%% Received/Sent Metrics
-export([received/1, sent/1]).
-export([start_link/0, create/1]).
-export([all/0, value/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]).
@ -40,58 +29,9 @@
-record(state, {tick}).
-define(METRIC_TAB, mqtt_metric).
-define(TAB, ?MODULE).
%% Bytes sent and received of Broker
-define(SYSTOP_BYTES, [
{counter, 'bytes/received'}, % Total bytes received
{counter, 'bytes/sent'} % Total bytes sent
]).
%% Packets sent and received of Broker
-define(SYSTOP_PACKETS, [
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect'} % DISCONNECT Packets received
]).
%% Messages sent and received of broker
-define(SYSTOP_MESSAGES, [
{counter, 'messages/received'}, % All Messages received
{counter, 'messages/sent'}, % All Messages sent
{counter, 'messages/qos0/received'}, % QoS0 Messages received
{counter, 'messages/qos0/sent'}, % QoS0 Messages sent
{counter, 'messages/qos1/received'}, % QoS1 Messages received
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent
{counter, 'messages/qos2/received'}, % QoS2 Messages received
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
{gauge, 'messages/retained'}, % Messagea retained
{counter, 'messages/dropped'}, % Messages dropped
{counter, 'messages/forward'} % Messages forward
]).
-define(SERVER, ?MODULE).
%%--------------------------------------------------------------------
%% API
@ -102,81 +42,13 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%% @doc Count packets received.
-spec(received(mqtt_packet()) -> ignore | non_neg_integer()).
received(Packet) ->
inc('packets/received'),
received1(Packet).
received1(?PUBLISH_PACKET(Qos, _PktId)) ->
inc('packets/publish/received'),
inc('messages/received'),
qos_received(Qos);
received1(?PACKET(Type)) ->
received2(Type).
received2(?CONNECT) ->
inc('packets/connect');
received2(?PUBACK) ->
inc('packets/puback/received');
received2(?PUBREC) ->
inc('packets/pubrec/received');
received2(?PUBREL) ->
inc('packets/pubrel/received');
received2(?PUBCOMP) ->
inc('packets/pubcomp/received');
received2(?SUBSCRIBE) ->
inc('packets/subscribe');
received2(?UNSUBSCRIBE) ->
inc('packets/unsubscribe');
received2(?PINGREQ) ->
inc('packets/pingreq');
received2(?DISCONNECT) ->
inc('packets/disconnect');
received2(_) ->
ignore.
qos_received(?QOS_0) ->
inc('messages/qos0/received');
qos_received(?QOS_1) ->
inc('messages/qos1/received');
qos_received(?QOS_2) ->
inc('messages/qos2/received').
create({gauge, Name}) ->
ets:insert(?TAB, {{Name, 0}, 0});
create({counter, Name}) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
ets:insert(?TAB, [{{Name, I}, 0} || I <- Schedulers]).
%% @doc Count packets received. Will not count $SYS PUBLISH.
-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
ignore;
sent(Packet) ->
inc('packets/sent'),
sent1(Packet).
sent1(?PUBLISH_PACKET(Qos, _PktId)) ->
inc('packets/publish/sent'),
inc('messages/sent'),
qos_sent(Qos);
sent1(?PACKET(Type)) ->
sent2(Type).
sent2(?CONNACK) ->
inc('packets/connack');
sent2(?PUBACK) ->
inc('packets/puback/sent');
sent2(?PUBREC) ->
inc('packets/pubrec/sent');
sent2(?PUBREL) ->
inc('packets/pubrel/sent');
sent2(?PUBCOMP) ->
inc('packets/pubcomp/sent');
sent2(?SUBACK) ->
inc('packets/suback');
sent2(?UNSUBACK) ->
inc('packets/unsuback');
sent2(?PINGRESP) ->
inc('packets/pingresp');
sent2(_Type) ->
ignore.
qos_sent(?QOS_0) ->
inc('messages/qos0/sent');
qos_sent(?QOS_1) ->
inc('messages/qos1/sent');
qos_sent(?QOS_2) ->
inc('messages/qos2/sent').
%% @doc Get all metrics
-spec(all() -> [{atom(), non_neg_integer()}]).
@ -188,12 +60,12 @@ all() ->
{ok, Count} -> maps:put(Metric, Count+Val, Map);
error -> maps:put(Metric, Val, Map)
end
end, #{}, ?METRIC_TAB)).
end, #{}, ?TAB)).
%% @doc Get metric value
-spec(value(atom()) -> non_neg_integer()).
value(Metric) ->
lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
lists:sum(ets:select(?TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])).
%% @doc Increase counter
-spec(inc(atom()) -> non_neg_integer()).
@ -212,9 +84,9 @@ inc(Metric, Val) when is_atom(Metric) ->
%% @doc Increase metric value
-spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()).
inc(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val});
ets:update_counter(?TAB, key(gauge, Metric), {2, Val});
inc(counter, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}).
ets:update_counter(?TAB, key(counter, Metric), {2, Val}).
%% @doc Decrease metric value
-spec(dec(gauge, atom()) -> integer()).
@ -224,13 +96,13 @@ dec(gauge, Metric) ->
%% @doc Decrease metric value
-spec(dec(gauge, atom(), pos_integer()) -> integer()).
dec(gauge, Metric, Val) ->
ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}).
ets:update_counter(?TAB, key(gauge, Metric), {2, -Val}).
%% @doc Set metric value
set(Metric, Val) when is_atom(Metric) ->
set(gauge, Metric, Val).
set(gauge, Metric, Val) ->
ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}).
ets:insert(?TAB, {key(gauge, Metric), Val}).
%% @doc Metric Key
key(gauge, Metric) ->
@ -239,16 +111,13 @@ key(counter, Metric) ->
{Metric, erlang:system_info(scheduler_id)}.
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([]) ->
emqx_time:seed(),
Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES,
% Create metrics table
ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]),
% Init metrics
[create_metric(Metric) || Metric <- Metrics],
ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]),
% Tick to publish metrics
{ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}.
@ -278,14 +147,7 @@ code_change(_OldVsn, State, _Extra) ->
publish(Metric, Val) ->
Msg = emqx_message:make(metrics, metric_topic(Metric), bin(Val)),
emqx:publish(emqx_message:set_flag(sys, Msg)).
create_metric({gauge, Name}) ->
ets:insert(?METRIC_TAB, {{Name, 0}, 0});
create_metric({counter, Name}) ->
Schedulers = lists:seq(1, erlang:system_info(schedulers)),
ets:insert(?METRIC_TAB, [{{Name, I}, 0} || I <- Schedulers]).
emqx_broker:publish(emqx_message:set_flag(sys, Msg)).
metric_topic(Metric) ->
emqx_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))).

113
src/emqx_mqtt5_props.erl Normal file
View File

@ -0,0 +1,113 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt5_props).
-author("Feng Lee <feng@emqtt.io>").
-export([name/1, id/1]).
%%--------------------------------------------------------------------
%% Property id to name
%%--------------------------------------------------------------------
%% 01: Byte; PUBLISH, Will Properties
name(16#01) -> 'Payload-Format-Indicator';
%% 02: Four Byte Integer; PUBLISH, Will Properties
name(16#02) -> 'Message-Expiry-Interval';
%% 03: UTF-8 Encoded String; PUBLISH, Will Properties
name(16#03) -> 'Content-Type';
%% 08: UTF-8 Encoded String; PUBLISH, Will Properties
name(16#08) -> 'Response-Topic';
%% 09: Binary Data; PUBLISH, Will Properties
name(16#09) -> 'Correlation-Data';
%% 11: Variable Byte Integer; PUBLISH, SUBSCRIBE
name(16#0B) -> 'Subscription-Identifier';
%% 17: Four Byte Integer; CONNECT, CONNACK, DISCONNECT
name(16#11) -> 'Session-Expiry-Interval';
%% 18: UTF-8 Encoded String; CONNACK
name(16#12) -> 'Assigned-Client-Identifier';
%% 19: Two Byte Integer; CONNACK
name(16#13) -> 'Server-Keep-Alive';
%% 21: UTF-8 Encoded String; CONNECT, CONNACK, AUTH
name(16#15) -> 'Authentication-Method';
%% 22: Binary Data; CONNECT, CONNACK, AUTH
name(16#16) -> 'Authentication-Data';
%% 23: Byte; CONNECT
name(16#17) -> 'Request-Problem-Information';
%% 24: Four Byte Integer; Will Properties
name(16#18) -> 'Will-Delay-Interval';
%% 25: Byte; CONNECT
name(16#19) -> 'Request-Response-Information';
%% 26: UTF-8 Encoded String; CONNACK
name(16#1A) -> 'Response Information';
%% 28: UTF-8 Encoded String; CONNACK, DISCONNECT
name(16#1C) -> 'Server-Reference';
%% 31: UTF-8 Encoded String; CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH
name(16#1F) -> 'Reason-String';
%% 33: Two Byte Integer; CONNECT, CONNACK
name(16#21) -> 'Receive-Maximum';
%% 34: Two Byte Integer; CONNECT, CONNACK
name(16#22) -> 'Topic-Alias-Maximum';
%% 35: Two Byte Integer; PUBLISH
name(16#23) -> 'Topic Alias';
%% 36: Byte; CONNACK
name(16#24) -> 'Maximum-QoS';
%% 37: Byte; CONNACK
name(16#25) -> 'Retain-Available';
%% 38: UTF-8 String Pair; ALL
name(16#26) -> 'User-Property';
%% 39: Four Byte Integer; CONNECT, CONNACK
name(16#27) -> 'Maximum-Packet-Size';
%% 40: Byte; CONNACK
name(16#28) -> 'Wildcard-Subscription-Available';
%% 41: Byte; CONNACK
name(16#29) -> 'Subscription-Identifier-Available';
%% 42: Byte; CONNACK
name(16#2A) -> 'Shared-Subscription-Available'.
%%--------------------------------------------------------------------
%% Property name to id
%%--------------------------------------------------------------------
id('Payload-Format-Indicator') -> 16#01;
id('Message-Expiry-Interval') -> 16#02;
id('Content-Type') -> 16#03;
id('Response-Topic') -> 16#08;
id('Correlation-Data') -> 16#09;
id('Subscription-Identifier') -> 16#0B;
id('Session-Expiry-Interval') -> 16#11;
id('Assigned-Client-Identifier') -> 16#12;
id('Server-Keep-Alive') -> 16#13;
id('Authentication-Method') -> 16#15;
id('Authentication Data') -> 16#16;
id('Request-Problem-Information') -> 16#17;
id('Will-Delay-Interval') -> 16#18;
id('Request-Response-Information') -> 16#19;
id('Response Information') -> 16#1A;
id('Server-Reference') -> 16#1C;
id('Reason-String') -> 16#1F;
id('Receive-Maximum') -> 16#21;
id('Topic-Alias-Maximum') -> 16#22;
id('Topic Alias') -> 16#23;
id('Maximum-QoS') -> 16#24;
id('Retain-Available') -> 16#25;
id('User-Property') -> 16#26;
id('Maximum-Packet-Size') -> 16#27;
id('Wildcard-Subscription-Available') -> 16#28;
id('Subscription-Identifier-Available') -> 16#29;
id('Shared-Subscription-Available') -> 16#2A.

195
src/emqx_mqtt5_rscode.erl Normal file
View File

@ -0,0 +1,195 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt5_rscode).
-author("Feng Lee <feng@emqtt.io>").
-export([name/1, value/1]).
%%--------------------------------------------------------------------
%% Reason code to name
%%--------------------------------------------------------------------
0
name(0x00
Success
CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH
0
name(0x00
Normal disconnection
DISCONNECT
0
name(0x00
Granted QoS 0
SUBACK
1
name(0x01
Granted QoS 1
SUBACK
2
name(0x02
Granted QoS 2
SUBACK
4
name(0x04
Disconnect with Will Message
DISCONNECT
16
name(0x10
No matching subscribers
PUBACK, PUBREC
17
name(0x11
No subscription existed
UNSUBACK
24
name(0x18
Continue authentication
AUTH
25
name(0x19
Re-authenticate
AUTH
128
name(0x80
Unspecified error
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
129
name(0x81
Malformed Packet
CONNACK, DISCONNECT
130
name(0x82
Protocol Error
CONNACK, DISCONNECT
131
name(0x83
Implementation specific error
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
132
name(0x84
Unsupported Protocol Version
CONNACK
133
name(0x85
Client Identifier not valid
CONNACK
134
name(0x86
Bad User Name or Password
CONNACK
135
name(0x87
Not authorized
CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT
136
name(0x88
Server unavailable
CONNACK
137
name(0x89
Server busy
CONNACK, DISCONNECT
138
name(0x8A
Banned
CONNACK
139
name(0x8B
Server shutting down
DISCONNECT
140
name(0x8C
Bad authentication method
CONNACK, DISCONNECT
141
name(0x8D
Keep Alive timeout
DISCONNECT
142
name(0x8E
Session taken over
DISCONNECT
143
name(0x8F
Topic Filter invalid
SUBACK, UNSUBACK, DISCONNECT
144
name(0x90
Topic Name invalid
CONNACK, PUBACK, PUBREC, DISCONNECT
145
name(0x91
Packet Identifier in use
PUBACK, PUBREC, SUBACK, UNSUBACK
146
name(0x92
Packet Identifier not found
PUBREL, PUBCOMP
147
name(0x93
Receive Maximum exceeded
DISCONNECT
148
name(0x94
Topic Alias invalid
DISCONNECT
149
name(0x95
Packet too large
CONNACK, DISCONNECT
150
name(0x96
Message rate too high
DISCONNECT
151
name(0x97
Quota exceeded
CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT
%% 152
name(0x98
Administrative action
DISCONNECT
%% 153
name(0x99
Payload format invalid
CONNACK, PUBACK, PUBREC, DISCONNECT
%% 154
name(0x9A
Retain not supported
CONNACK, DISCONNECT
%% 155
name(0x9B
QoS not supported
CONNACK, DISCONNECT
%% 156
name(0x9C
Use another server
CONNACK, DISCONNECT
%% 157: CONNACK, DISCONNECT
name(0x9D) -> 'Server-Moved';
%% 158: SUBACK, DISCONNECT
name(0x9E) -> 'Shared-Subscriptions-Not-Supported';
%% 159: CONNACK, DISCONNECT
name(0x9F) -> 'Connection-Rate-Exceeded';
%% 160: DISCONNECT
name(0xA0) -> 'Maximum-Connect-Time';
%% 161: SUBACK, DISCONNECT
name(0xA1) -> 'Subscription-Identifiers-Not-Supported';
%% 162: SUBACK, DISCONNECT
name(0xA2) -> 'Wildcard-Subscriptions-Not-Supported';

29
src/emqx_mqtt_app.erl Normal file
View File

@ -0,0 +1,29 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt_app).
-behaviour(application).
-export([start/2, stop/1]).
start(_Type, _Args) ->
emqx_mqtt_metrics:init(),
emqx_mqtt_sup:start_link().
stop(_State) ->
ok.

159
src/emqx_mqtt_metrics.erl Normal file
View File

@ -0,0 +1,159 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_mqtt_metrics).
-include("emqx_mqtt.hrl").
-import(emqx_metrics, [inc/1]).
-export([init/0]).
%% Received/Sent Metrics
-export([received/1, sent/1]).
%% Bytes sent and received of Broker
-define(SYSTOP_BYTES, [
{counter, 'bytes/received'}, % Total bytes received
{counter, 'bytes/sent'} % Total bytes sent
]).
%% Packets sent and received of Broker
-define(SYSTOP_PACKETS, [
{counter, 'packets/received'}, % All Packets received
{counter, 'packets/sent'}, % All Packets sent
{counter, 'packets/connect'}, % CONNECT Packets received
{counter, 'packets/connack'}, % CONNACK Packets sent
{counter, 'packets/publish/received'}, % PUBLISH packets received
{counter, 'packets/publish/sent'}, % PUBLISH packets sent
{counter, 'packets/puback/received'}, % PUBACK packets received
{counter, 'packets/puback/sent'}, % PUBACK packets sent
{counter, 'packets/puback/missed'}, % PUBACK packets missed
{counter, 'packets/pubrec/received'}, % PUBREC packets received
{counter, 'packets/pubrec/sent'}, % PUBREC packets sent
{counter, 'packets/pubrec/missed'}, % PUBREC packets missed
{counter, 'packets/pubrel/received'}, % PUBREL packets received
{counter, 'packets/pubrel/sent'}, % PUBREL packets sent
{counter, 'packets/pubrel/missed'}, % PUBREL packets missed
{counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
{counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent
{counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed
{counter, 'packets/subscribe'}, % SUBSCRIBE Packets received
{counter, 'packets/suback'}, % SUBACK packets sent
{counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received
{counter, 'packets/unsuback'}, % UNSUBACK Packets sent
{counter, 'packets/pingreq'}, % PINGREQ packets received
{counter, 'packets/pingresp'}, % PINGRESP Packets sent
{counter, 'packets/disconnect'} % DISCONNECT Packets received
]).
%% Messages sent and received of broker
-define(SYSTOP_MESSAGES, [
{counter, 'messages/received'}, % All Messages received
{counter, 'messages/sent'}, % All Messages sent
{counter, 'messages/qos0/received'}, % QoS0 Messages received
{counter, 'messages/qos0/sent'}, % QoS0 Messages sent
{counter, 'messages/qos1/received'}, % QoS1 Messages received
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent
{counter, 'messages/qos2/received'}, % QoS2 Messages received
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
{gauge, 'messages/retained'}, % Messagea retained
{counter, 'messages/dropped'}, % Messages dropped
{counter, 'messages/forward'} % Messages forward
]).
% Init metrics
init() ->
lists:foreach(fun emqx_metrics:create/1,
?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES).
%% @doc Count packets received.
-spec(received(mqtt_packet()) -> ok).
received(Packet) ->
inc('packets/received'),
received1(Packet).
received1(?PUBLISH_PACKET(Qos, _PktId)) ->
inc('packets/publish/received'),
inc('messages/received'),
qos_received(Qos);
received1(?PACKET(Type)) ->
received2(Type).
received2(?CONNECT) ->
inc('packets/connect');
received2(?PUBACK) ->
inc('packets/puback/received');
received2(?PUBREC) ->
inc('packets/pubrec/received');
received2(?PUBREL) ->
inc('packets/pubrel/received');
received2(?PUBCOMP) ->
inc('packets/pubcomp/received');
received2(?SUBSCRIBE) ->
inc('packets/subscribe');
received2(?UNSUBSCRIBE) ->
inc('packets/unsubscribe');
received2(?PINGREQ) ->
inc('packets/pingreq');
received2(?DISCONNECT) ->
inc('packets/disconnect');
received2(_) ->
ignore.
qos_received(?QOS_0) ->
inc('messages/qos0/received');
qos_received(?QOS_1) ->
inc('messages/qos1/received');
qos_received(?QOS_2) ->
inc('messages/qos2/received').
%% @doc Count packets received. Will not count $SYS PUBLISH.
-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()).
sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) ->
ignore;
sent(Packet) ->
inc('packets/sent'),
sent1(Packet).
sent1(?PUBLISH_PACKET(Qos, _PktId)) ->
inc('packets/publish/sent'),
inc('messages/sent'),
qos_sent(Qos);
sent1(?PACKET(Type)) ->
sent2(Type).
sent2(?CONNACK) ->
inc('packets/connack');
sent2(?PUBACK) ->
inc('packets/puback/sent');
sent2(?PUBREC) ->
inc('packets/pubrec/sent');
sent2(?PUBREL) ->
inc('packets/pubrel/sent');
sent2(?PUBCOMP) ->
inc('packets/pubcomp/sent');
sent2(?SUBACK) ->
inc('packets/suback');
sent2(?UNSUBACK) ->
inc('packets/unsuback');
sent2(?PINGRESP) ->
inc('packets/pingresp');
sent2(_Type) ->
ignore.
qos_sent(?QOS_0) ->
inc('messages/qos0/sent');
qos_sent(?QOS_1) ->
inc('messages/qos1/sent');
qos_sent(?QOS_2) ->
inc('messages/qos2/sent').

View File

@ -43,8 +43,8 @@
-module(emqx_mqueue).
-author("Feng Lee <feng@emqtt.io>").
%% TODO: XYZ
%%
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
@ -209,10 +209,10 @@ maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) ->
MQ;
maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun})
when Len > HighWM ->
Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
severity = warning,
title = io_lib:format("Queue ~s high-water mark", [Name]),
summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
Alarm = #alarm{id = iolist_to_binary(["queue_high_watermark.", Name]),
severity = warning,
title = io_lib:format("Queue ~s high-water mark", [Name]),
summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])},
MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)};
maybe_set_alarm(MQ) ->
MQ.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Packet Parser
-module(emqx_parser).
-author("Feng Lee <feng@emqtt.io>").
@ -74,12 +73,12 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Max
true -> parse_frame(Rest, Header, FrameLen)
end.
parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) ->
parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) ->
case {Type, Bin} of
{?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
{ProtoName, Rest1} = parse_utf(FrameBin),
%% Fix mosquitto bridge: 0x83, 0x84
<<BridgeTag:4, ProtoVersion:4, Rest2/binary>> = Rest1,
<<BridgeTag:4, ProtoVer:4, Rest2/binary>> = Rest1,
<<UsernameFlag : 1,
PasswordFlag : 1,
WillRetain : 1,
@ -89,16 +88,18 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
_Reserved : 1,
KeepAlive : 16/big,
Rest3/binary>> = Rest2,
{ClientId, Rest4} = parse_utf(Rest3),
{WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
{WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
{UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
{PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
{Properties, Rest4} = parse_properties(ProtoVer, Rest3),
{ClientId, Rest5} = parse_utf(Rest4),
{WillProps, Rest6} = parse_will_props(Rest5, ProtoVer, WillFlag),
{WillTopic, Rest7} = parse_utf(Rest6, WillFlag),
{WillMsg, Rest8} = parse_msg(Rest7, WillFlag),
{UserName, Rest9} = parse_utf(Rest8, UsernameFlag),
{PasssWord, <<>>} = parse_utf(Rest9, PasswordFlag),
case protocol_name_approved(ProtoVersion, ProtoName) of
true ->
wrap(Header,
#mqtt_packet_connect{
proto_ver = ProtoVersion,
proto_ver = ProtoVer,
proto_name = ProtoName,
will_retain = bool(WillRetain),
will_qos = WillQos,
@ -106,11 +107,13 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
clean_sess = bool(CleanSess),
keep_alive = KeepAlive,
client_id = ClientId,
will_props = WillProps,
will_topic = WillTopic,
will_msg = WillMsg,
username = UserName,
password = PasssWord,
is_bridge = (BridgeTag =:= 8)}, Rest);
is_bridge = (BridgeTag =:= 8),
properties = Properties}, Rest);
false ->
{error, protocol_header_corrupt}
end;
@ -120,55 +123,72 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length)
% return_code = ReturnCode }, Rest);
{?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
{TopicName, Rest1} = parse_utf(FrameBin),
{PacketId, Payload} = case Qos of
0 -> {undefined, Rest1};
_ -> <<Id:16/big, R/binary>> = Rest1,
{Id, R}
end,
{PacketId, Rest2} = case Qos of
0 -> {undefined, Rest1};
_ -> <<Id:16/big, R/binary>> = Rest1,
{Id, R}
end,
{Properties, Payload} = parse_properties(ProtoVer, Rest),
wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId},
packet_id = PacketId,
properties = Properties},
Payload, Rest);
{?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?PUBREC, <<FrameBin:Length/binary, Rest/binary>>} ->
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?PUBREL, <<FrameBin:Length/binary, Rest/binary>>} ->
%% 1 = Qos,
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{?PUBCOMP, <<FrameBin:Length/binary, Rest/binary>>} ->
<<PacketId:16/big>> = FrameBin,
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest);
{PubAck, <<FrameBin:Length/binary, Rest/binary>>}
when PubAck == ?PUBACK; PubAck == ?PUBREC; PubAck == ?PUBREL; PubAck == ?PUBCOMP ->
<<PacketId:16/big, Rest1/binary>> = FrameBin,
case ProtoVer == ?MQTT_PROTO_V5 of
true ->
<<ReasonCode, Rest2/binary>> = Rest1,
{Properties, Rest3} = parse_properties(ProtoVer, Rest2),
wrap(Header, #mqtt_packet_puback{packet_id = PacketId,
reason_code = ReasonCode,
properties = Properties}, Rest3);
false ->
wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest)
end;
{?SUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
TopicTable = parse_topics(?SUBSCRIBE, Rest1, []),
wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId,
properties = Properties,
topic_table = TopicTable}, Rest);
%{?SUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
% <<PacketId:16/big, Rest1/binary>> = FrameBin,
% wrap(Header, #mqtt_packet_suback{packet_id = PacketId,
% qos_table = parse_qos(Rest1, []) }, Rest);
% {Properties, Rest2/binary>> = parse_properties(ProtoVer, Rest1),
% wrap(Header, #mqtt_packet_suback{packet_id = PacketId, properties = Properties,
% reason_codes = parse_qos(Rest1, [])}, Rest);
{?UNSUBSCRIBE, <<FrameBin:Length/binary, Rest/binary>>} ->
%% 1 = Qos,
<<PacketId:16/big, Rest1/binary>> = FrameBin,
Topics = parse_topics(?UNSUBSCRIBE, Rest1, []),
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
topics = Topics}, Rest);
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
Topics = parse_topics(?UNSUBSCRIBE, Rest2, []),
wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId,
properties = Properties,
topics = Topics}, Rest);
%{?UNSUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
% <<PacketId:16/big>> = FrameBin,
% wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest);
% <<PacketId:16/big, Rest1/binary>> = FrameBin,
% {Properties, Rest2} = parse_properties(ProtoVer, Rest1),
% wrap(Header, #mqtt_packet_unsuback {
% packet_id = PacketId,
% properties = Properties }, Rest);
{?PINGREQ, Rest} ->
Length = 0,
wrap(Header, Rest);
%{?PINGRESP, Rest} ->
% Length = 0,
% wrap(Header, Rest);
{?DISCONNECT, Rest} ->
Length = 0,
wrap(Header, Rest);
{?DISCONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
case ProtoVer == ?MQTT_PROTO_V5 of
true ->
<<ReasonCode, Rest1/binary>> = Rest,
{Properties, Rest2} = parse_properties(ProtoVer, Rest1),
wrap(Header, #mqtt_packet_disconnect{reason_code = Reason,
properties = Properties}, Rest2);
false ->
Lenght = 0, wrap(Header, Rest)
end;
{_, TooShortBin} ->
{more, fun(BinMore) ->
parse_frame(<<TooShortBin/binary, BinMore/binary>>,
@ -183,21 +203,134 @@ wrap(Header, Variable, Rest) ->
wrap(Header, Rest) ->
{ok, #mqtt_packet{header = Header}, Rest}.
%client function
%parse_qos(<<>>, Acc) ->
% lists:reverse(Acc);
%parse_qos(<<QoS:8/unsigned, Rest/binary>>, Acc) ->
% parse_qos(Rest, [QoS | Acc]).
parse_will_props(Bin, ProtoVer = ?MQTT_PROTO_V5, 1) ->
parse_properties(ProtoVer, Bin);
parse_will_props(Bin, _ProtoVer, _WillFlag),
{#{}, Bin}.
parse_topics(_, <<>>, Topics) ->
parse_properties(?MQTT_PROTO_V5, Bin) ->
{Len, Rest} = parse_variable_byte_integer(Bin),
<<PropsBin:Len/binary, Rest1} = Rest,
{parse_property(PropsBin, #{}), Rest1};
parse_properties(_MQTT_PROTO_V3, Bin) ->
{#{}, Bin}. %% No properties.
parse_property(<<>>, Props) ->
Props;
%% 01: 'Payload-Format-Indicator', Byte;
parse_property(<<16#01, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Payload-Format-Indicator' => Val});
%% 02: 'Message-Expiry-Interval', Four Byte Integer;
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val});
%% 03: 'Content-Type', UTF-8 Encoded String;
parse_property(<<16#03, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Content-Type' => Val});
%% 08: 'Response-Topic', UTF-8 Encoded String;
parse_property(<<16#08, Bin/binary>>) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Response-Topic' => Val});
%% 09: 'Correlation-Data', Binary Data;
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>) ->
parse_property(Bin, Props#{'Correlation-Data' => Val});
%% 11: 'Subscription-Identifier', Variable Byte Integer;
parse_property(<<16#0B, Bin/binary>>, Props) ->
{Val, Rest} = parse_variable_byte_integer(Bin),
parse_property(Rest, Props#{'Subscription-Identifier' => Val});
%% 17: 'Session-Expiry-Interval', Four Byte Integer;
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val});
%% 18: 'Assigned-Client-Identifier', UTF-8 Encoded String;
parse_property(<<16#12, Bin/binary>>) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val});
%% 19: 'Server-Keep-Alive', Two Byte Integer;
parse_property(<<16#13, Val:16, Bin/binary>>) ->
parse_property(Bin, Props#{'Server-Keep-Alive' => Val});
%% 21: 'Authentication-Method', UTF-8 Encoded String;
parse_property(<<16#15, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Authentication-Method' => Val})
%% 22: 'Authentication-Data', Binary Data;
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>) ->
parse_property(Bin, Props#{'Authentication-Data' => Val});
%% 23: 'Request-Problem-Information', Byte;
parse_property(<<16#17, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Request-Problem-Information' => Val});
%% 24: 'Will-Delay-Interval', Four Byte Integer;
parse_property(<<16#18, Val:32, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Will-Delay-Interval' => Val});
%% 25: 'Request-Response-Information', Byte;
parse_property(<<16#19, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Request-Response-Information' => Val});
%% 26: 'Response Information', UTF-8 Encoded String;
parse_property(<<16#1A, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Response-Information' => Val});
%% 28: 'Server-Reference', UTF-8 Encoded String;
parse_property(<<16#1C, Bin/binary>>, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Server-Reference' => Val});
%% 31: 'Reason-String', UTF-8 Encoded String;
parse_property(<<16#1F, Bin/binary, Props) ->
{Val, Rest} = parse_utf(Bin),
parse_property(Rest, Props#{'Reason-String' => Val});
%% 33: 'Receive-Maximum', Two Byte Integer;
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Receive-Maximum' => Val});
%% 34: 'Topic-Alias-Maximum', Two Byte Integer;
parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val});
%% 35: 'Topic-Alias', Two Byte Integer;
parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Topic-Alias' => Val});
%% 36: 'Maximum-QoS', Byte;
parse_property(<<16#24, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Maximum-QoS' => Val});
%% 37: 'Retain-Available', Byte;
parse_property(<<16#25, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Retain-Available' => Val});
%% 38: 'User-Property', UTF-8 String Pair;
parse_property(<<16#26, Bin/binary>>, Props) ->
{Pair, Rest} = parse_utf_pair(Bin),
parse_property(Rest, case maps:find('User-Property', Props) of
{ok, UserProps} -> Props#{'User-Property' := [Pair | UserProps]};
error -> Props#{'User-Property' := [Pair]}
end);
%% 39: 'Maximum-Packet-Size', Four Byte Integer;
parse_property(<<16#27, Val:32, Bin/binary>>, Props) ->
parse_property(Rest, Props#{'Maximum-Packet-Size' => Val});
%% 40: 'Wildcard-Subscription-Available', Byte;
parse_property(<<16#28, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val});
%% 41: 'Subscription-Identifier-Available', Byte;
parse_property(<<16#29, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val});
%% 42: 'Shared-Subscription-Available', Byte;
parse_property(<<16#2A, Val, Bin/binary>>, Props) ->
parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}).
parse_variable_byte_integer(Bin) ->
parse_variable_byte_integer(Bin, 1, 0).
parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) ->
parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) ->
{Value + Len * Multiplier, Rest}.
parse_topics(_Packet, <<>>, Topics) ->
lists:reverse(Topics);
parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
{Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
parse_topics(Sub, Rest, [{Name, QoS}| Topics]);
{Name, <<<<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2>>, Rest/binary>>} = parse_utf(Bin),
SubOpts = [{qos, Qos}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}],
parse_topics(Sub, Rest, [{Name, SubOpts}| Topics]);
parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
{Name, <<Rest/binary>>} = parse_utf(Bin),
parse_topics(Sub, Rest, [Name | Topics]).
parse_utf_pair(Bin) ->
[{Name, Value} || <<Len:16/big, Name:Len/binary, Len2:16/big, Value:Len2/binary>> <= Bin].
parse_utf(Bin, 0) ->
{undefined, Bin};
parse_utf(Bin, _) ->
@ -229,3 +362,4 @@ fixdup(Header = #mqtt_packet_header{qos = ?QOS0, dup = true}) ->
fixdup(Header = #mqtt_packet_header{qos = ?QOS2, dup = true}) ->
Header#mqtt_packet_header{dup = false};
fixdup(Header) -> Header.

View File

@ -157,16 +157,16 @@ stop_plugins(Names) ->
[stop_app(App) || App <- Names].
%% @doc List all available plugins
-spec(list() -> [mqtt_plugin()]).
-spec(list() -> [plugin()]).
list() ->
case emqx:env(plugins_etc_dir) of
{ok, PluginsEtc} ->
CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(),
Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles],
StartedApps = names(started_app),
lists:map(fun(Plugin = #mqtt_plugin{name = Name}) ->
lists:map(fun(Plugin = #plugin{name = Name}) ->
case lists:member(Name, StartedApps) of
true -> Plugin#mqtt_plugin{active = true};
true -> Plugin#plugin{active = true};
false -> Plugin
end
end, Plugins);
@ -179,7 +179,7 @@ plugin(CfgFile) ->
{ok, Attrs} = application:get_all_key(AppName),
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#mqtt_plugin{name = AppName, version = Ver, descr = Descr}.
#plugin{name = AppName, version = Ver, descr = Descr}.
%% @doc Load a Plugin
-spec(load(atom()) -> ok | {error, term()}).
@ -198,7 +198,7 @@ load(PluginName) when is_atom(PluginName) ->
end
end.
load_plugin(#mqtt_plugin{name = Name}, Persistent) ->
load_plugin(#plugin{name = Name}, Persistent) ->
case load_app(Name) of
ok ->
start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end);
@ -280,7 +280,7 @@ names(started_app) ->
[Name || {Name, _Descr, _Ver} <- application:which_applications()];
names(Plugins) ->
[Name || #mqtt_plugin{name = Name} <- Plugins].
[Name || #plugin{name = Name} <- Plugins].
plugin_loaded(_Name, false) ->
ok;

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,11 +14,8 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc Common Pool Supervisor
-module(emqx_pool_sup).
-author("Feng Lee <feng@emqtt.io>").
-behaviour(supervisor).
%% API

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc.
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -20,8 +20,6 @@
-behaviour(gen_server).
-include("emqx_internal.hrl").
%% Start the pool supervisor
-export([start_link/0]).
@ -44,7 +42,9 @@ start_link() ->
-spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}).
start_link(Pool, Id) ->
gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []).
gen_server:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []).
name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])).
%% @doc Submit work to pooler
submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity).
@ -61,20 +61,17 @@ worker() ->
%%--------------------------------------------------------------------
init([Pool, Id]) ->
?GPROC_POOL(join, Pool, Id),
gproc_pool:connect_worker(Pool, {Pool, Id}),
{ok, #state{pool = Pool, id = Id}}.
handle_call({submit, Fun}, _From, State) ->
{reply, run(Fun), State};
{reply, catch run(Fun), State};
handle_call(_Req, _From, State) ->
{reply, ok, State}.
handle_cast({async_submit, Fun}, State) ->
try run(Fun)
catch _:Error ->
lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()])
end,
try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()]) end,
{noreply, State};
handle_cast(_Msg, State) ->
@ -84,7 +81,7 @@ handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, #state{pool = Pool, id = Id}) ->
?GPROC_POOL(leave, Pool, Id), ok.
gproc_pool:disconnect_worker(Pool, {Pool, Id}).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -14,7 +14,6 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc MQTT Packet Serializer
-module(emqx_serializer).
-author("Feng Lee <feng@emqtt.io>").
@ -81,23 +80,28 @@ serialize_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId,
{VariableBin, <<PayloadBin1/binary, UserPasswd/binary>>};
serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags,
return_code = ReturnCode}, undefined) ->
{<<AckFlags:8, ReturnCode:8>>, <<>>};
reason_code = ReasonCode,
properties = Properties}, undefined) ->
PropsBin = serialize_properties(Properties),
{<<AckFlags:8, ReturnCode:8, PropsBin/binary>>, <<>>};
serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId,
topic_table = Topics }, undefined) ->
{<<PacketId:16/big>>, serialize_topics(Topics)};
serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId,
qos_table = QosTable}, undefined) ->
{<<PacketId:16/big>>, << <<Q:8>> || Q <- QosTable >>};
properties = Properties,
reason_codes = ReasonCodes}, undefined) ->
{<<PacketId:16/big, (serialize_properties(Properties))/binary>>, << <<Code>> || Code <- ReasonCodes >>};
serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId,
topics = Topics }, undefined) ->
{<<PacketId:16/big>>, serialize_topics(Topics)};
serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) ->
{<<PacketId:16/big>>, <<>>};
serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId,
properties = Properties,
reason_codes = ReasonCodes}, undefined) ->
{<<PacketId:16/big, (serialize_properties(Properties))/binary>>, << <<Code>> || Code <- ReasonCodes >>};
serialize_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName,
packet_id = PacketId }, PayloadBin) ->
@ -118,33 +122,132 @@ serialize_variable(?PINGREQ, undefined, undefined) ->
serialize_variable(?PINGRESP, undefined, undefined) ->
{<<>>, <<>>};
serialize_variable(?DISCONNECT, undefined, undefined) ->
{<<>>, <<>>}.
serialize_variable(?DISCONNECT, #mqtt_packet_disconnect{reason_code = ReasonCode,
properties = Properties}, undefined) ->
{<<ReasonCode, (serialize_properties(Properties))/binary>>, <<>>}.
serialize_variable(?AUTH, #mqtt_packet_auth{reason_code = ReasonCode,
properties = Properties}, undefined) ->
{<<ReasonCode, (serialize_properties(Properties))/binary>>, <<>>}.
serialize_payload(undefined) ->
undefined;
serialize_payload(Bin) when is_binary(Bin) ->
Bin.
serialize_properties(undefined) ->
<<>>;
serialize_properties(Props) ->
<< serialize_property(Prop, Val) || {Prop, Val} <= (maps:to_list(Props)) >>.
%% 01: Byte;
serialize_property('Payload-Format-Indicator', Val) ->
<<16#01, Val>>;
%% 02: Four Byte Integer;
serialize_property('Message-Expiry-Interval', Val) ->
<<16#02, Val:32/big>>;
%% 03: UTF-8 Encoded String;
serialize_property('Content-Type', Val) ->
<<16#03, (serialize_utf(Val))/binary>>;
%% 08: UTF-8 Encoded String;
serialize_property('Response-Topic', Val) ->
<<16#08, (serialize_utf(Val))/binary>>;
%% 09: Binary Data;
serialize_property('Correlation-Data', Val) ->
<<16#09, (iolist_size(Val)):16, Val/binary>>;
%% 11: Variable Byte Integer;
serialize_property('Subscription-Identifier', Val) ->
<<16#0B, (serialize_variable_byte_integer(Val))/binary>>;
%% 17: Four Byte Integer;
serialize_property('Session-Expiry-Interval', Val) ->
<<16#11, Val:32/big>>;
%% 18: UTF-8 Encoded String;
serialize_property('Assigned-Client-Identifier', Val) ->
<<16#12, (serialize_utf(Val))/binary>>;
%% 19: Two Byte Integer;
serialize_property('Server-Keep-Alive', Val) ->
<<16#13, Val:16/big>>;
%% 21: UTF-8 Encoded String;
serialize_property('Authentication-Method', Val) ->
<<16#15, (serialize_utf(Val))/binary>>;
%% 22: Binary Data;
serialize_property('Authentication-Data', Val) ->
<<16#16, (iolist_size(Val)):16, Val/binary>>;
%% 23: Byte;
serialize_property('Request-Problem-Information', Val) ->
<<16#17, Val>>;
%% 24: Four Byte Integer;
serialize_property('Will-Delay-Interval', Val) ->
<<16#18, Val:32/big>>;
%% 25: Byte;
serialize_property('Request-Response-Information', Val) ->
<<16#19, Val>>;
%% 26: UTF-8 Encoded String;
serialize_property('Response-Information', Val) ->
<<16#1A, (serialize_utf(Val))/binary>>;
%% 28: UTF-8 Encoded String;
serialize_property('Server-Reference', Val) ->
<<16#1C, (serialize_utf(Val))/binary>>;
%% 31: UTF-8 Encoded String;
serialize_property('Reason-String', Val) ->
<<16#1F, (serialize_utf(Val))/binary>>;
%% 33: Two Byte Integer;
serialize_property('Receive-Maximum', Val) ->
<<16#21, Val:16/big>>;
%% 34: Two Byte Integer;
serialize_property('Topic-Alias-Maximum', Val) ->
<<16#22, Val:16/big>>;
%% 35: Two Byte Integer;
serialize_property('Topic-Alias', Val) ->
<<16#23, Val:16/big>>;
%% 36: Byte;
serialize_property('Maximum-QoS', Val) ->
<<16#24, Val>>;
%% 37: Byte;
serialize_property('Retain-Available', Val) ->
<<16#25, Val>>;
%% 38: UTF-8 String Pair;
serialize_property('User-Property', Val) ->
<<16#26, (serialize_utf_pair(Val))/binary>>;
%% 39: Four Byte Integer;
serialize_property('Maximum-Packet-Size', Val) ->
<<16#27, Val:32/big>>;
%% 40: Byte;
serialize_property('Wildcard-Subscription-Available', Val) ->
<<16#28, Val>>;
%% 41: Byte;
serialize_property('Subscription-Identifier-Available', Val) ->
<<16#29, Val>>;
%% 42: Byte;
serialize_property('Shared-Subscription-Available', Val) ->
<<16#2A, Val>>.
serialize_topics([{_Topic, _Qos}|_] = Topics) ->
<< <<(serialize_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>;
serialize_topics([H|_] = Topics) when is_binary(H) ->
<< <<(serialize_utf(Topic))/binary>> || Topic <- Topics >>.
serialize_utf_pair({Name, Value}) ->
<< <<(serialize_utf(S))/binary, (serialize_utf(S))/binary>> || S <- [Name, Value] >>.
serialize_utf(String) ->
StringBin = unicode:characters_to_binary(String),
Len = byte_size(StringBin),
true = (Len =< 16#ffff),
<<Len:16/big, StringBin/binary>>.
serialize_len(N) when N =< ?LOWBITS ->
serialize_len(I) ->
serialize_variable_byte_integer(I). %%TODO: refactor later.
serialize_variable_byte_integer(N) when N =< ?LOWBITS ->
<<0:1, N:7>>;
serialize_len(N) ->
<<1:1, (N rem ?HIGHBIT):7, (serialize_len(N div ?HIGHBIT))/binary>>.
serialize_variable_byte_integer(N) ->
<<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>.
opt(undefined) -> ?RESERVED;
opt(false) -> 0;
opt(true) -> 1;
opt(X) when is_integer(X) -> X;
opt(B) when is_binary(B) -> 1.

View File

@ -18,12 +18,13 @@
-behaviour(gen_server).
-author("Feng Lee <feng@emqtt.io>").
-include("emqx.hrl").
-export([start_link/0, stop/0]).
%% Get all Stats
-export([all/0]).
%% Client and Session Stats
-export([set_client_stats/2, get_client_stats/1, del_client_stats/1,
set_session_stats/2, get_session_stats/1, del_session_stats/1]).
@ -115,6 +116,8 @@ get_session_stats(ClientId) ->
del_session_stats(ClientId) ->
ets:delete(?SESSION_STATS_TAB, ClientId).
all() -> ets:tab2list(?STATS_TAB).
%% @doc Generate stats fun
-spec(statsfun(Stat :: atom()) -> fun()).
statsfun(Stat) ->

View File

@ -1,5 +1,5 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@ -16,12 +16,8 @@
-module(emqx_sysmon).
-author("Feng Lee <feng@emqtt.io>").
-behavior(gen_server).
-include("emqx_internal.hrl").
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@ -29,22 +25,21 @@
-record(state, {tickref, events = [], tracelog}).
-define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]).
%%-define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]).
-define(LOG(Msg, ProcInfo),
lager:warning([{sysmon, true}], "~s~n~p", [WarnMsg, ProcInfo])).
lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
-define(LOG(Msg, ProcInfo, PortInfo),
lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
%% @doc Start system monitor
-spec(start_link(Opts :: list(tuple())) ->
{ok, pid()} | ignore | {error, term()}).
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%--------------------------------------------------------------------
%% gen_server callbacks
%% gen_server Callbacks
%%--------------------------------------------------------------------
init([Opts]) ->
@ -80,10 +75,12 @@ parse_opt([_Opt|Opts], Acc) ->
parse_opt(Opts, Acc).
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[SYSMON] Unexpected Call: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[SYSMON] Unexpected Cast: ~p", [Msg]),
{noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid}, fun() ->
@ -131,7 +128,8 @@ handle_info(reset, State) ->
{noreply, State#state{events = []}, hibernate};
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[SYSMON] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{tickref = TRef, tracelog = TraceLog}) ->
timer:cancel(TRef),

View File

@ -20,8 +20,6 @@
-author("Feng Lee <feng@emqtt.io>").
-include("emqx_internal.hrl").
%% API Function Exports
-export([start_link/0]).
@ -100,13 +98,16 @@ handle_call(all_traces, _From, State = #state{traces = Traces}) ->
<- maps:to_list(Traces)], State};
handle_call(Req, _From, State) ->
?UNEXPECTED_REQ(Req, State).
lager:error("[TRACE] Unexpected Call: ~p", [Req]),
{reply, ignore, State}.
handle_cast(Msg, State) ->
?UNEXPECTED_MSG(Msg, State).
lager:error("[TRACE] Unexpected Cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
?UNEXPECTED_INFO(Info, State).
lager:error("[TRACE] Unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->
ok.