From f7f0f27e4d91b745dc29bc844819281d0da7b729 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Tue, 27 Feb 2018 23:45:55 +0800 Subject: [PATCH] Parse and serialize MQTT 5.0 protocol packets --- include/emqx.hrl | 14 +-- src/emqx_alarm.erl | 16 +-- src/emqx_cli.erl | 32 ++++++ src/emqx_ctl.erl | 39 +++---- src/emqx_metrics.erl | 176 ++++------------------------- src/emqx_mqtt5_props.erl | 113 +++++++++++++++++++ src/emqx_mqtt5_rscode.erl | 195 ++++++++++++++++++++++++++++++++ src/emqx_mqtt_app.erl | 29 +++++ src/emqx_mqtt_metrics.erl | 159 ++++++++++++++++++++++++++ src/emqx_mqueue.erl | 12 +- src/emqx_parser.erl | 230 ++++++++++++++++++++++++++++++-------- src/emqx_plugins.erl | 12 +- src/emqx_pool_sup.erl | 5 +- src/emqx_pooler.erl | 19 ++-- src/emqx_serializer.erl | 129 ++++++++++++++++++--- src/emqx_stats.erl | 7 +- src/emqx_sysmon.erl | 26 ++--- src/emqx_trace.erl | 11 +- 18 files changed, 921 insertions(+), 303 deletions(-) create mode 100644 src/emqx_cli.erl create mode 100644 src/emqx_mqtt5_props.erl create mode 100644 src/emqx_mqtt5_rscode.erl create mode 100644 src/emqx_mqtt_app.erl create mode 100644 src/emqx_mqtt_metrics.erl diff --git a/include/emqx.hrl b/include/emqx.hrl index 1d80f009e..ccfaad6d0 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -164,26 +164,26 @@ -type(route() :: #route{}). %%-------------------------------------------------------------------- -%% MQTT Alarm +%% Alarm %%-------------------------------------------------------------------- --record(mqtt_alarm, +-record(alarm, { id :: binary(), - severity :: warning | error | critical, + severity :: notice | warning | error | critical, title :: iolist() | binary(), summary :: iolist() | binary(), timestamp :: erlang:timestamp() }). --type(mqtt_alarm() :: #mqtt_alarm{}). +-type(alarm() :: #alarm{}). %%-------------------------------------------------------------------- -%% MQTT Plugin +%% Plugin %%-------------------------------------------------------------------- --record(mqtt_plugin, { name, version, descr, active = false }). +-record(plugin, { name, version, descr, active = false }). --type(mqtt_plugin() :: #mqtt_plugin{}). +-type(plugin() :: #plugin{}). %%-------------------------------------------------------------------- %% MQTT CLI Command. For example: 'broker metrics' diff --git a/src/emqx_alarm.erl b/src/emqx_alarm.erl index df2134b94..f82ed5f2d 100644 --- a/src/emqx_alarm.erl +++ b/src/emqx_alarm.erl @@ -56,15 +56,15 @@ alarm_fun(Bool) -> (clear, _AlarmId) when Bool =:= false -> alarm_fun(false) end. --spec(set_alarm(mqtt_alarm()) -> ok). -set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) -> +-spec(set_alarm(alarm()) -> ok). +set_alarm(Alarm) when is_record(Alarm, alarm) -> gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}). -spec(clear_alarm(any()) -> ok). clear_alarm(AlarmId) when is_binary(AlarmId) -> gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}). --spec(get_alarms() -> list(mqtt_alarm())). +-spec(get_alarms() -> list(alarm())). get_alarms() -> gen_event:call(?ALARM_MGR, ?MODULE, get_alarms). @@ -83,10 +83,10 @@ delete_alarm_handler(Module) when is_atom(Module) -> init(_) -> {ok, []}. -handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, - severity = Severity, - title = Title, - summary = Summary}}, Alarms)-> +handle_event({set_alarm, Alarm = #alarm{id = AlarmId, + severity = Severity, + title = Title, + summary = Summary}}, Alarms)-> TS = os:timestamp(), Json = mochijson2:encode([{id, AlarmId}, {severity, Severity}, @@ -94,7 +94,7 @@ handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId, {summary, iolist_to_binary(Summary)}, {ts, emqx_time:now_secs(TS)}]), emqx:publish(alarm_msg(alert, AlarmId, Json)), - {ok, [Alarm#mqtt_alarm{timestamp = TS} | Alarms]}; + {ok, [Alarm#alarm{timestamp = TS} | Alarms]}; handle_event({clear_alarm, AlarmId}, Alarms) -> Json = mochijson2:encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]), diff --git a/src/emqx_cli.erl b/src/emqx_cli.erl new file mode 100644 index 000000000..97a5ca298 --- /dev/null +++ b/src/emqx_cli.erl @@ -0,0 +1,32 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_cli). + +-export([print/1, print/2, usage/1]). + +print(Msg) -> + io:format(Msg). + +print(Format, Args) -> + io:format(Format, Args). + +usage(CmdList) -> + lists:foreach( + fun({Cmd, Descr}) -> + io:format("~-48s# ~s~n", [Cmd, Descr]) + end, CmdList). + diff --git a/src/emqx_ctl.erl b/src/emqx_ctl.erl index 5f07125b2..ee1616b52 100644 --- a/src/emqx_ctl.erl +++ b/src/emqx_ctl.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,14 +18,6 @@ -behaviour(gen_server). --author("Feng Lee "). - --include("emqx.hrl"). - --include("emqx_cli.hrl"). - --define(SERVER, ?MODULE). - %% API Function Exports -export([start_link/0, register_cmd/2, register_cmd/3, unregister_cmd/1, lookup/1, run/1]). @@ -36,7 +28,9 @@ -record(state, {seq = 0}). --define(CMD_TAB, mqttd_ctl_cmd). +-define(SERVER, ?MODULE). + +-define(TAB, ?MODULE). %%-------------------------------------------------------------------- %% API @@ -87,40 +81,40 @@ run([CmdS|Args]) -> %% @doc Lookup a command -spec(lookup(atom()) -> [{module(), atom()}]). lookup(Cmd) -> - case ets:match(?CMD_TAB, {{'_', Cmd}, '$1', '_'}) of + case ets:match(?TAB, {{'_', Cmd}, '$1', '_'}) of [El] -> El; [] -> [] end. %% @doc Usage usage() -> - ?PRINT("Usage: ~s~n", [?MODULE]), - [begin ?PRINT("~80..-s~n", [""]), Mod:Cmd(usage) end - || {_, {Mod, Cmd}, _} <- ets:tab2list(?CMD_TAB)]. + io:format("Usage: ~s~n", [?MODULE]), + [begin io:format("~80..-s~n", [""]), Mod:Cmd(usage) end + || {_, {Mod, Cmd}, _} <- ets:tab2list(?TAB)]. %%-------------------------------------------------------------------- %% gen_server callbacks %%-------------------------------------------------------------------- init([]) -> - ets:new(?CMD_TAB, [ordered_set, named_table, protected]), + ets:new(?TAB, [ordered_set, named_table, protected]), {ok, #state{seq = 0}}. handle_call(_Request, _From, State) -> {reply, ok, State}. handle_cast({register_cmd, Cmd, MF, Opts}, State = #state{seq = Seq}) -> - case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of + case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of [] -> - ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}); + ets:insert(?TAB, {{Seq, Cmd}, MF, Opts}); [[OriginSeq] | _] -> lager:warning("CLI: ~s is overidden by ~p", [Cmd, MF]), - ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}) + ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts}) end, noreply(next_seq(State)); handle_cast({unregister_cmd, Cmd}, State) -> - ets:match_delete(?CMD_TAB, {{'_', Cmd}, '_', '_'}), + ets:match_delete(?TAB, {{'_', Cmd}, '_', '_'}), noreply(State); handle_cast(_Msg, State) -> @@ -136,7 +130,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. %%-------------------------------------------------------------------- -%% Internal Function Definitions +%% Internal Function %%-------------------------------------------------------------------- noreply(State) -> @@ -159,9 +153,10 @@ register_cmd_test_() -> ok = emqx_ctl:terminate(shutdown, State) end, fun(State = #state{seq = Seq}) -> - emqx_ctl:handle_cast({register_cmd, test0, {?MODULE, test0}, []}, State), - [?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?CMD_TAB, {Seq,test0}))] + emqx_ctl:handle_cast({register_cmd, test0, {?MODULE, test0}, []}, State), + [?_assertMatch([{{0,test0},{?MODULE, test0}, []}], ets:lookup(?TAB, {Seq,test0}))] end }. -endif. + diff --git a/src/emqx_metrics.erl b/src/emqx_metrics.erl index 7c3d8f763..cb477aaa9 100644 --- a/src/emqx_metrics.erl +++ b/src/emqx_metrics.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -18,19 +18,8 @@ -behaviour(gen_server). --author("Feng Lee "). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --define(SERVER, ?MODULE). - %% API Function Exports --export([start_link/0]). - -%% Received/Sent Metrics --export([received/1, sent/1]). +-export([start_link/0, create/1]). -export([all/0, value/1, inc/1, inc/2, inc/3, dec/2, dec/3, set/2]). @@ -40,58 +29,9 @@ -record(state, {tick}). --define(METRIC_TAB, mqtt_metric). +-define(TAB, ?MODULE). -%% Bytes sent and received of Broker --define(SYSTOP_BYTES, [ - {counter, 'bytes/received'}, % Total bytes received - {counter, 'bytes/sent'} % Total bytes sent -]). - -%% Packets sent and received of Broker --define(SYSTOP_PACKETS, [ - {counter, 'packets/received'}, % All Packets received - {counter, 'packets/sent'}, % All Packets sent - {counter, 'packets/connect'}, % CONNECT Packets received - {counter, 'packets/connack'}, % CONNACK Packets sent - {counter, 'packets/publish/received'}, % PUBLISH packets received - {counter, 'packets/publish/sent'}, % PUBLISH packets sent - {counter, 'packets/puback/received'}, % PUBACK packets received - {counter, 'packets/puback/sent'}, % PUBACK packets sent - {counter, 'packets/puback/missed'}, % PUBACK packets missed - {counter, 'packets/pubrec/received'}, % PUBREC packets received - {counter, 'packets/pubrec/sent'}, % PUBREC packets sent - {counter, 'packets/pubrec/missed'}, % PUBREC packets missed - {counter, 'packets/pubrel/received'}, % PUBREL packets received - {counter, 'packets/pubrel/sent'}, % PUBREL packets sent - {counter, 'packets/pubrel/missed'}, % PUBREL packets missed - {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received - {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent - {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed - {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received - {counter, 'packets/suback'}, % SUBACK packets sent - {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received - {counter, 'packets/unsuback'}, % UNSUBACK Packets sent - {counter, 'packets/pingreq'}, % PINGREQ packets received - {counter, 'packets/pingresp'}, % PINGRESP Packets sent - {counter, 'packets/disconnect'} % DISCONNECT Packets received -]). - -%% Messages sent and received of broker --define(SYSTOP_MESSAGES, [ - {counter, 'messages/received'}, % All Messages received - {counter, 'messages/sent'}, % All Messages sent - {counter, 'messages/qos0/received'}, % QoS0 Messages received - {counter, 'messages/qos0/sent'}, % QoS0 Messages sent - {counter, 'messages/qos1/received'}, % QoS1 Messages received - {counter, 'messages/qos1/sent'}, % QoS1 Messages sent - {counter, 'messages/qos2/received'}, % QoS2 Messages received - {counter, 'messages/qos2/sent'}, % QoS2 Messages sent - {counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped - {gauge, 'messages/retained'}, % Messagea retained - {counter, 'messages/dropped'}, % Messages dropped - {counter, 'messages/forward'} % Messages forward -]). +-define(SERVER, ?MODULE). %%-------------------------------------------------------------------- %% API @@ -102,81 +42,13 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -%% @doc Count packets received. --spec(received(mqtt_packet()) -> ignore | non_neg_integer()). -received(Packet) -> - inc('packets/received'), - received1(Packet). -received1(?PUBLISH_PACKET(Qos, _PktId)) -> - inc('packets/publish/received'), - inc('messages/received'), - qos_received(Qos); -received1(?PACKET(Type)) -> - received2(Type). -received2(?CONNECT) -> - inc('packets/connect'); -received2(?PUBACK) -> - inc('packets/puback/received'); -received2(?PUBREC) -> - inc('packets/pubrec/received'); -received2(?PUBREL) -> - inc('packets/pubrel/received'); -received2(?PUBCOMP) -> - inc('packets/pubcomp/received'); -received2(?SUBSCRIBE) -> - inc('packets/subscribe'); -received2(?UNSUBSCRIBE) -> - inc('packets/unsubscribe'); -received2(?PINGREQ) -> - inc('packets/pingreq'); -received2(?DISCONNECT) -> - inc('packets/disconnect'); -received2(_) -> - ignore. -qos_received(?QOS_0) -> - inc('messages/qos0/received'); -qos_received(?QOS_1) -> - inc('messages/qos1/received'); -qos_received(?QOS_2) -> - inc('messages/qos2/received'). +create({gauge, Name}) -> + ets:insert(?TAB, {{Name, 0}, 0}); + +create({counter, Name}) -> + Schedulers = lists:seq(1, erlang:system_info(schedulers)), + ets:insert(?TAB, [{{Name, I}, 0} || I <- Schedulers]). -%% @doc Count packets received. Will not count $SYS PUBLISH. --spec(sent(mqtt_packet()) -> ignore | non_neg_integer()). -sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) -> - ignore; -sent(Packet) -> - inc('packets/sent'), - sent1(Packet). -sent1(?PUBLISH_PACKET(Qos, _PktId)) -> - inc('packets/publish/sent'), - inc('messages/sent'), - qos_sent(Qos); -sent1(?PACKET(Type)) -> - sent2(Type). -sent2(?CONNACK) -> - inc('packets/connack'); -sent2(?PUBACK) -> - inc('packets/puback/sent'); -sent2(?PUBREC) -> - inc('packets/pubrec/sent'); -sent2(?PUBREL) -> - inc('packets/pubrel/sent'); -sent2(?PUBCOMP) -> - inc('packets/pubcomp/sent'); -sent2(?SUBACK) -> - inc('packets/suback'); -sent2(?UNSUBACK) -> - inc('packets/unsuback'); -sent2(?PINGRESP) -> - inc('packets/pingresp'); -sent2(_Type) -> - ignore. -qos_sent(?QOS_0) -> - inc('messages/qos0/sent'); -qos_sent(?QOS_1) -> - inc('messages/qos1/sent'); -qos_sent(?QOS_2) -> - inc('messages/qos2/sent'). %% @doc Get all metrics -spec(all() -> [{atom(), non_neg_integer()}]). @@ -188,12 +60,12 @@ all() -> {ok, Count} -> maps:put(Metric, Count+Val, Map); error -> maps:put(Metric, Val, Map) end - end, #{}, ?METRIC_TAB)). + end, #{}, ?TAB)). %% @doc Get metric value -spec(value(atom()) -> non_neg_integer()). value(Metric) -> - lists:sum(ets:select(?METRIC_TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). + lists:sum(ets:select(?TAB, [{{{Metric, '_'}, '$1'}, [], ['$1']}])). %% @doc Increase counter -spec(inc(atom()) -> non_neg_integer()). @@ -212,9 +84,9 @@ inc(Metric, Val) when is_atom(Metric) -> %% @doc Increase metric value -spec(inc(counter | gauge, atom(), pos_integer()) -> pos_integer()). inc(gauge, Metric, Val) -> - ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, Val}); + ets:update_counter(?TAB, key(gauge, Metric), {2, Val}); inc(counter, Metric, Val) -> - ets:update_counter(?METRIC_TAB, key(counter, Metric), {2, Val}). + ets:update_counter(?TAB, key(counter, Metric), {2, Val}). %% @doc Decrease metric value -spec(dec(gauge, atom()) -> integer()). @@ -224,13 +96,13 @@ dec(gauge, Metric) -> %% @doc Decrease metric value -spec(dec(gauge, atom(), pos_integer()) -> integer()). dec(gauge, Metric, Val) -> - ets:update_counter(?METRIC_TAB, key(gauge, Metric), {2, -Val}). + ets:update_counter(?TAB, key(gauge, Metric), {2, -Val}). %% @doc Set metric value set(Metric, Val) when is_atom(Metric) -> set(gauge, Metric, Val). set(gauge, Metric, Val) -> - ets:insert(?METRIC_TAB, {key(gauge, Metric), Val}). + ets:insert(?TAB, {key(gauge, Metric), Val}). %% @doc Metric Key key(gauge, Metric) -> @@ -239,16 +111,13 @@ key(counter, Metric) -> {Metric, erlang:system_info(scheduler_id)}. %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- init([]) -> emqx_time:seed(), - Metrics = ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES, % Create metrics table - ets:new(?METRIC_TAB, [set, public, named_table, {write_concurrency, true}]), - % Init metrics - [create_metric(Metric) || Metric <- Metrics], + ets:new(?TAB, [set, public, named_table, {write_concurrency, true}]), % Tick to publish metrics {ok, #state{tick = emqx_broker:start_tick(tick)}, hibernate}. @@ -278,14 +147,7 @@ code_change(_OldVsn, State, _Extra) -> publish(Metric, Val) -> Msg = emqx_message:make(metrics, metric_topic(Metric), bin(Val)), - emqx:publish(emqx_message:set_flag(sys, Msg)). - -create_metric({gauge, Name}) -> - ets:insert(?METRIC_TAB, {{Name, 0}, 0}); - -create_metric({counter, Name}) -> - Schedulers = lists:seq(1, erlang:system_info(schedulers)), - ets:insert(?METRIC_TAB, [{{Name, I}, 0} || I <- Schedulers]). + emqx_broker:publish(emqx_message:set_flag(sys, Msg)). metric_topic(Metric) -> emqx_topic:systop(list_to_binary(lists:concat(['metrics/', Metric]))). diff --git a/src/emqx_mqtt5_props.erl b/src/emqx_mqtt5_props.erl new file mode 100644 index 000000000..70857d18c --- /dev/null +++ b/src/emqx_mqtt5_props.erl @@ -0,0 +1,113 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt5_props). + +-author("Feng Lee "). + +-export([name/1, id/1]). + +%%-------------------------------------------------------------------- +%% Property id to name +%%-------------------------------------------------------------------- + +%% 01: Byte; PUBLISH, Will Properties +name(16#01) -> 'Payload-Format-Indicator'; +%% 02: Four Byte Integer; PUBLISH, Will Properties +name(16#02) -> 'Message-Expiry-Interval'; +%% 03: UTF-8 Encoded String; PUBLISH, Will Properties +name(16#03) -> 'Content-Type'; +%% 08: UTF-8 Encoded String; PUBLISH, Will Properties +name(16#08) -> 'Response-Topic'; +%% 09: Binary Data; PUBLISH, Will Properties +name(16#09) -> 'Correlation-Data'; +%% 11: Variable Byte Integer; PUBLISH, SUBSCRIBE +name(16#0B) -> 'Subscription-Identifier'; +%% 17: Four Byte Integer; CONNECT, CONNACK, DISCONNECT +name(16#11) -> 'Session-Expiry-Interval'; +%% 18: UTF-8 Encoded String; CONNACK +name(16#12) -> 'Assigned-Client-Identifier'; +%% 19: Two Byte Integer; CONNACK +name(16#13) -> 'Server-Keep-Alive'; +%% 21: UTF-8 Encoded String; CONNECT, CONNACK, AUTH +name(16#15) -> 'Authentication-Method'; +%% 22: Binary Data; CONNECT, CONNACK, AUTH +name(16#16) -> 'Authentication-Data'; +%% 23: Byte; CONNECT +name(16#17) -> 'Request-Problem-Information'; +%% 24: Four Byte Integer; Will Properties +name(16#18) -> 'Will-Delay-Interval'; +%% 25: Byte; CONNECT +name(16#19) -> 'Request-Response-Information'; +%% 26: UTF-8 Encoded String; CONNACK +name(16#1A) -> 'Response Information'; +%% 28: UTF-8 Encoded String; CONNACK, DISCONNECT +name(16#1C) -> 'Server-Reference'; +%% 31: UTF-8 Encoded String; CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH +name(16#1F) -> 'Reason-String'; +%% 33: Two Byte Integer; CONNECT, CONNACK +name(16#21) -> 'Receive-Maximum'; +%% 34: Two Byte Integer; CONNECT, CONNACK +name(16#22) -> 'Topic-Alias-Maximum'; +%% 35: Two Byte Integer; PUBLISH +name(16#23) -> 'Topic Alias'; +%% 36: Byte; CONNACK +name(16#24) -> 'Maximum-QoS'; +%% 37: Byte; CONNACK +name(16#25) -> 'Retain-Available'; +%% 38: UTF-8 String Pair; ALL +name(16#26) -> 'User-Property'; +%% 39: Four Byte Integer; CONNECT, CONNACK +name(16#27) -> 'Maximum-Packet-Size'; +%% 40: Byte; CONNACK +name(16#28) -> 'Wildcard-Subscription-Available'; +%% 41: Byte; CONNACK +name(16#29) -> 'Subscription-Identifier-Available'; +%% 42: Byte; CONNACK +name(16#2A) -> 'Shared-Subscription-Available'. + +%%-------------------------------------------------------------------- +%% Property name to id +%%-------------------------------------------------------------------- + +id('Payload-Format-Indicator') -> 16#01; +id('Message-Expiry-Interval') -> 16#02; +id('Content-Type') -> 16#03; +id('Response-Topic') -> 16#08; +id('Correlation-Data') -> 16#09; +id('Subscription-Identifier') -> 16#0B; +id('Session-Expiry-Interval') -> 16#11; +id('Assigned-Client-Identifier') -> 16#12; +id('Server-Keep-Alive') -> 16#13; +id('Authentication-Method') -> 16#15; +id('Authentication Data') -> 16#16; +id('Request-Problem-Information') -> 16#17; +id('Will-Delay-Interval') -> 16#18; +id('Request-Response-Information') -> 16#19; +id('Response Information') -> 16#1A; +id('Server-Reference') -> 16#1C; +id('Reason-String') -> 16#1F; +id('Receive-Maximum') -> 16#21; +id('Topic-Alias-Maximum') -> 16#22; +id('Topic Alias') -> 16#23; +id('Maximum-QoS') -> 16#24; +id('Retain-Available') -> 16#25; +id('User-Property') -> 16#26; +id('Maximum-Packet-Size') -> 16#27; +id('Wildcard-Subscription-Available') -> 16#28; +id('Subscription-Identifier-Available') -> 16#29; +id('Shared-Subscription-Available') -> 16#2A. + diff --git a/src/emqx_mqtt5_rscode.erl b/src/emqx_mqtt5_rscode.erl new file mode 100644 index 000000000..10c137678 --- /dev/null +++ b/src/emqx_mqtt5_rscode.erl @@ -0,0 +1,195 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqx.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt5_rscode). + +-author("Feng Lee "). + +-export([name/1, value/1]). + +%%-------------------------------------------------------------------- +%% Reason code to name +%%-------------------------------------------------------------------- + +0 +name(0x00 +Success +CONNACK, PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK, AUTH +0 +name(0x00 +Normal disconnection +DISCONNECT +0 +name(0x00 +Granted QoS 0 +SUBACK +1 +name(0x01 +Granted QoS 1 +SUBACK +2 +name(0x02 +Granted QoS 2 +SUBACK +4 +name(0x04 +Disconnect with Will Message +DISCONNECT +16 +name(0x10 +No matching subscribers +PUBACK, PUBREC +17 +name(0x11 +No subscription existed +UNSUBACK +24 +name(0x18 +Continue authentication +AUTH +25 +name(0x19 +Re-authenticate +AUTH +128 +name(0x80 +Unspecified error +CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +129 +name(0x81 +Malformed Packet +CONNACK, DISCONNECT +130 +name(0x82 +Protocol Error +CONNACK, DISCONNECT +131 +name(0x83 +Implementation specific error +CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +132 +name(0x84 +Unsupported Protocol Version +CONNACK +133 +name(0x85 +Client Identifier not valid +CONNACK +134 +name(0x86 +Bad User Name or Password +CONNACK +135 +name(0x87 +Not authorized +CONNACK, PUBACK, PUBREC, SUBACK, UNSUBACK, DISCONNECT +136 +name(0x88 +Server unavailable +CONNACK +137 +name(0x89 +Server busy +CONNACK, DISCONNECT +138 +name(0x8A +Banned +CONNACK +139 +name(0x8B +Server shutting down +DISCONNECT +140 +name(0x8C +Bad authentication method +CONNACK, DISCONNECT +141 +name(0x8D +Keep Alive timeout +DISCONNECT +142 +name(0x8E +Session taken over +DISCONNECT +143 +name(0x8F +Topic Filter invalid +SUBACK, UNSUBACK, DISCONNECT +144 +name(0x90 +Topic Name invalid +CONNACK, PUBACK, PUBREC, DISCONNECT +145 +name(0x91 +Packet Identifier in use +PUBACK, PUBREC, SUBACK, UNSUBACK +146 +name(0x92 +Packet Identifier not found +PUBREL, PUBCOMP +147 +name(0x93 +Receive Maximum exceeded +DISCONNECT +148 +name(0x94 +Topic Alias invalid +DISCONNECT +149 +name(0x95 +Packet too large +CONNACK, DISCONNECT +150 +name(0x96 +Message rate too high +DISCONNECT +151 +name(0x97 +Quota exceeded +CONNACK, PUBACK, PUBREC, SUBACK, DISCONNECT +%% 152 +name(0x98 +Administrative action +DISCONNECT +%% 153 +name(0x99 +Payload format invalid +CONNACK, PUBACK, PUBREC, DISCONNECT +%% 154 +name(0x9A +Retain not supported +CONNACK, DISCONNECT +%% 155 +name(0x9B +QoS not supported +CONNACK, DISCONNECT +%% 156 +name(0x9C +Use another server +CONNACK, DISCONNECT +%% 157: CONNACK, DISCONNECT +name(0x9D) -> 'Server-Moved'; +%% 158: SUBACK, DISCONNECT +name(0x9E) -> 'Shared-Subscriptions-Not-Supported'; +%% 159: CONNACK, DISCONNECT +name(0x9F) -> 'Connection-Rate-Exceeded'; +%% 160: DISCONNECT +name(0xA0) -> 'Maximum-Connect-Time'; +%% 161: SUBACK, DISCONNECT +name(0xA1) -> 'Subscription-Identifiers-Not-Supported'; +%% 162: SUBACK, DISCONNECT +name(0xA2) -> 'Wildcard-Subscriptions-Not-Supported'; + diff --git a/src/emqx_mqtt_app.erl b/src/emqx_mqtt_app.erl new file mode 100644 index 000000000..bde8f4633 --- /dev/null +++ b/src/emqx_mqtt_app.erl @@ -0,0 +1,29 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt_app). + +-behaviour(application). + +-export([start/2, stop/1]). + +start(_Type, _Args) -> + emqx_mqtt_metrics:init(), + emqx_mqtt_sup:start_link(). + +stop(_State) -> + ok. + diff --git a/src/emqx_mqtt_metrics.erl b/src/emqx_mqtt_metrics.erl new file mode 100644 index 000000000..c9be6bdfc --- /dev/null +++ b/src/emqx_mqtt_metrics.erl @@ -0,0 +1,159 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_mqtt_metrics). + +-include("emqx_mqtt.hrl"). + +-import(emqx_metrics, [inc/1]). + +-export([init/0]). + +%% Received/Sent Metrics +-export([received/1, sent/1]). + +%% Bytes sent and received of Broker +-define(SYSTOP_BYTES, [ + {counter, 'bytes/received'}, % Total bytes received + {counter, 'bytes/sent'} % Total bytes sent +]). + +%% Packets sent and received of Broker +-define(SYSTOP_PACKETS, [ + {counter, 'packets/received'}, % All Packets received + {counter, 'packets/sent'}, % All Packets sent + {counter, 'packets/connect'}, % CONNECT Packets received + {counter, 'packets/connack'}, % CONNACK Packets sent + {counter, 'packets/publish/received'}, % PUBLISH packets received + {counter, 'packets/publish/sent'}, % PUBLISH packets sent + {counter, 'packets/puback/received'}, % PUBACK packets received + {counter, 'packets/puback/sent'}, % PUBACK packets sent + {counter, 'packets/puback/missed'}, % PUBACK packets missed + {counter, 'packets/pubrec/received'}, % PUBREC packets received + {counter, 'packets/pubrec/sent'}, % PUBREC packets sent + {counter, 'packets/pubrec/missed'}, % PUBREC packets missed + {counter, 'packets/pubrel/received'}, % PUBREL packets received + {counter, 'packets/pubrel/sent'}, % PUBREL packets sent + {counter, 'packets/pubrel/missed'}, % PUBREL packets missed + {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received + {counter, 'packets/pubcomp/sent'}, % PUBCOMP packets sent + {counter, 'packets/pubcomp/missed'}, % PUBCOMP packets missed + {counter, 'packets/subscribe'}, % SUBSCRIBE Packets received + {counter, 'packets/suback'}, % SUBACK packets sent + {counter, 'packets/unsubscribe'}, % UNSUBSCRIBE Packets received + {counter, 'packets/unsuback'}, % UNSUBACK Packets sent + {counter, 'packets/pingreq'}, % PINGREQ packets received + {counter, 'packets/pingresp'}, % PINGRESP Packets sent + {counter, 'packets/disconnect'} % DISCONNECT Packets received +]). + +%% Messages sent and received of broker +-define(SYSTOP_MESSAGES, [ + {counter, 'messages/received'}, % All Messages received + {counter, 'messages/sent'}, % All Messages sent + {counter, 'messages/qos0/received'}, % QoS0 Messages received + {counter, 'messages/qos0/sent'}, % QoS0 Messages sent + {counter, 'messages/qos1/received'}, % QoS1 Messages received + {counter, 'messages/qos1/sent'}, % QoS1 Messages sent + {counter, 'messages/qos2/received'}, % QoS2 Messages received + {counter, 'messages/qos2/sent'}, % QoS2 Messages sent + {counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped + {gauge, 'messages/retained'}, % Messagea retained + {counter, 'messages/dropped'}, % Messages dropped + {counter, 'messages/forward'} % Messages forward +]). + +% Init metrics +init() -> + lists:foreach(fun emqx_metrics:create/1, + ?SYSTOP_BYTES ++ ?SYSTOP_PACKETS ++ ?SYSTOP_MESSAGES). + +%% @doc Count packets received. +-spec(received(mqtt_packet()) -> ok). +received(Packet) -> + inc('packets/received'), + received1(Packet). +received1(?PUBLISH_PACKET(Qos, _PktId)) -> + inc('packets/publish/received'), + inc('messages/received'), + qos_received(Qos); +received1(?PACKET(Type)) -> + received2(Type). +received2(?CONNECT) -> + inc('packets/connect'); +received2(?PUBACK) -> + inc('packets/puback/received'); +received2(?PUBREC) -> + inc('packets/pubrec/received'); +received2(?PUBREL) -> + inc('packets/pubrel/received'); +received2(?PUBCOMP) -> + inc('packets/pubcomp/received'); +received2(?SUBSCRIBE) -> + inc('packets/subscribe'); +received2(?UNSUBSCRIBE) -> + inc('packets/unsubscribe'); +received2(?PINGREQ) -> + inc('packets/pingreq'); +received2(?DISCONNECT) -> + inc('packets/disconnect'); +received2(_) -> + ignore. +qos_received(?QOS_0) -> + inc('messages/qos0/received'); +qos_received(?QOS_1) -> + inc('messages/qos1/received'); +qos_received(?QOS_2) -> + inc('messages/qos2/received'). + +%% @doc Count packets received. Will not count $SYS PUBLISH. +-spec(sent(mqtt_packet()) -> ignore | non_neg_integer()). +sent(?PUBLISH_PACKET(_Qos, <<"$SYS/", _/binary>>, _, _)) -> + ignore; +sent(Packet) -> + inc('packets/sent'), + sent1(Packet). +sent1(?PUBLISH_PACKET(Qos, _PktId)) -> + inc('packets/publish/sent'), + inc('messages/sent'), + qos_sent(Qos); +sent1(?PACKET(Type)) -> + sent2(Type). +sent2(?CONNACK) -> + inc('packets/connack'); +sent2(?PUBACK) -> + inc('packets/puback/sent'); +sent2(?PUBREC) -> + inc('packets/pubrec/sent'); +sent2(?PUBREL) -> + inc('packets/pubrel/sent'); +sent2(?PUBCOMP) -> + inc('packets/pubcomp/sent'); +sent2(?SUBACK) -> + inc('packets/suback'); +sent2(?UNSUBACK) -> + inc('packets/unsuback'); +sent2(?PINGRESP) -> + inc('packets/pingresp'); +sent2(_Type) -> + ignore. +qos_sent(?QOS_0) -> + inc('messages/qos0/sent'); +qos_sent(?QOS_1) -> + inc('messages/qos1/sent'); +qos_sent(?QOS_2) -> + inc('messages/qos2/sent'). + diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index 5692577a2..190e525fc 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -43,8 +43,8 @@ -module(emqx_mqueue). --author("Feng Lee "). - +%% TODO: XYZ +%% -include("emqx.hrl"). -include("emqx_mqtt.hrl"). @@ -209,10 +209,10 @@ maybe_set_alarm(MQ = #mqueue{high_wm = undefined}) -> MQ; maybe_set_alarm(MQ = #mqueue{name = Name, len = Len, high_wm = HighWM, alarm_fun = AlarmFun}) when Len > HighWM -> - Alarm = #mqtt_alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), - severity = warning, - title = io_lib:format("Queue ~s high-water mark", [Name]), - summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, + Alarm = #alarm{id = iolist_to_binary(["queue_high_watermark.", Name]), + severity = warning, + title = io_lib:format("Queue ~s high-water mark", [Name]), + summary = io_lib:format("queue len ~p > high_watermark ~p", [Len, HighWM])}, MQ#mqueue{alarm_fun = AlarmFun(alert, Alarm)}; maybe_set_alarm(MQ) -> MQ. diff --git a/src/emqx_parser.erl b/src/emqx_parser.erl index 7d8f0d314..c7539112e 100644 --- a/src/emqx_parser.erl +++ b/src/emqx_parser.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Packet Parser -module(emqx_parser). -author("Feng Lee "). @@ -74,12 +73,12 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, Max true -> parse_frame(Rest, Header, FrameLen) end. -parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -> +parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) -> case {Type, Bin} of {?CONNECT, <>} -> {ProtoName, Rest1} = parse_utf(FrameBin), %% Fix mosquitto bridge: 0x83, 0x84 - <> = Rest1, + <> = Rest1, <> = Rest2, - {ClientId, Rest4} = parse_utf(Rest3), - {WillTopic, Rest5} = parse_utf(Rest4, WillFlag), - {WillMsg, Rest6} = parse_msg(Rest5, WillFlag), - {UserName, Rest7} = parse_utf(Rest6, UsernameFlag), - {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), + {Properties, Rest4} = parse_properties(ProtoVer, Rest3), + {ClientId, Rest5} = parse_utf(Rest4), + {WillProps, Rest6} = parse_will_props(Rest5, ProtoVer, WillFlag), + {WillTopic, Rest7} = parse_utf(Rest6, WillFlag), + {WillMsg, Rest8} = parse_msg(Rest7, WillFlag), + {UserName, Rest9} = parse_utf(Rest8, UsernameFlag), + {PasssWord, <<>>} = parse_utf(Rest9, PasswordFlag), case protocol_name_approved(ProtoVersion, ProtoName) of true -> wrap(Header, #mqtt_packet_connect{ - proto_ver = ProtoVersion, + proto_ver = ProtoVer, proto_name = ProtoName, will_retain = bool(WillRetain), will_qos = WillQos, @@ -106,11 +107,13 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) clean_sess = bool(CleanSess), keep_alive = KeepAlive, client_id = ClientId, + will_props = WillProps, will_topic = WillTopic, will_msg = WillMsg, username = UserName, password = PasssWord, - is_bridge = (BridgeTag =:= 8)}, Rest); + is_bridge = (BridgeTag =:= 8), + properties = Properties}, Rest); false -> {error, protocol_header_corrupt} end; @@ -120,55 +123,72 @@ parse_frame(Bin, #mqtt_packet_header{type = Type, qos = Qos} = Header, Length) % return_code = ReturnCode }, Rest); {?PUBLISH, <>} -> {TopicName, Rest1} = parse_utf(FrameBin), - {PacketId, Payload} = case Qos of - 0 -> {undefined, Rest1}; - _ -> <> = Rest1, - {Id, R} - end, + {PacketId, Rest2} = case Qos of + 0 -> {undefined, Rest1}; + _ -> <> = Rest1, + {Id, R} + end, + {Properties, Payload} = parse_properties(ProtoVer, Rest), wrap(fixdup(Header), #mqtt_packet_publish{topic_name = TopicName, - packet_id = PacketId}, + packet_id = PacketId, + properties = Properties}, Payload, Rest); - {?PUBACK, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); - {?PUBREC, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); - {?PUBREL, <>} -> - %% 1 = Qos, - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); - {?PUBCOMP, <>} -> - <> = FrameBin, - wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest); + {PubAck, <>} + when PubAck == ?PUBACK; PubAck == ?PUBREC; PubAck == ?PUBREL; PubAck == ?PUBCOMP -> + <> = FrameBin, + case ProtoVer == ?MQTT_PROTO_V5 of + true -> + <> = Rest1, + {Properties, Rest3} = parse_properties(ProtoVer, Rest2), + wrap(Header, #mqtt_packet_puback{packet_id = PacketId, + reason_code = ReasonCode, + properties = Properties}, Rest3); + false -> + wrap(Header, #mqtt_packet_puback{packet_id = PacketId}, Rest) + end; {?SUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, + {Properties, Rest2} = parse_properties(ProtoVer, Rest1), TopicTable = parse_topics(?SUBSCRIBE, Rest1, []), wrap(Header, #mqtt_packet_subscribe{packet_id = PacketId, + properties = Properties, topic_table = TopicTable}, Rest); %{?SUBACK, <>} -> % <> = FrameBin, - % wrap(Header, #mqtt_packet_suback{packet_id = PacketId, - % qos_table = parse_qos(Rest1, []) }, Rest); + % {Properties, Rest2/binary>> = parse_properties(ProtoVer, Rest1), + % wrap(Header, #mqtt_packet_suback{packet_id = PacketId, properties = Properties, + % reason_codes = parse_qos(Rest1, [])}, Rest); {?UNSUBSCRIBE, <>} -> %% 1 = Qos, <> = FrameBin, - Topics = parse_topics(?UNSUBSCRIBE, Rest1, []), - wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId, - topics = Topics}, Rest); + {Properties, Rest2} = parse_properties(ProtoVer, Rest1), + Topics = parse_topics(?UNSUBSCRIBE, Rest2, []), + wrap(Header, #mqtt_packet_unsubscribe{packet_id = PacketId, + properties = Properties, + topics = Topics}, Rest); %{?UNSUBACK, <>} -> - % <> = FrameBin, - % wrap(Header, #mqtt_packet_unsuback { packet_id = PacketId }, Rest); + % <> = FrameBin, + % {Properties, Rest2} = parse_properties(ProtoVer, Rest1), + % wrap(Header, #mqtt_packet_unsuback { + % packet_id = PacketId, + % properties = Properties }, Rest); {?PINGREQ, Rest} -> Length = 0, wrap(Header, Rest); %{?PINGRESP, Rest} -> % Length = 0, % wrap(Header, Rest); - {?DISCONNECT, Rest} -> - Length = 0, - wrap(Header, Rest); + {?DISCONNECT, <>} -> + case ProtoVer == ?MQTT_PROTO_V5 of + true -> + <> = Rest, + {Properties, Rest2} = parse_properties(ProtoVer, Rest1), + wrap(Header, #mqtt_packet_disconnect{reason_code = Reason, + properties = Properties}, Rest2); + false -> + Lenght = 0, wrap(Header, Rest) + end; {_, TooShortBin} -> {more, fun(BinMore) -> parse_frame(<>, @@ -183,21 +203,134 @@ wrap(Header, Variable, Rest) -> wrap(Header, Rest) -> {ok, #mqtt_packet{header = Header}, Rest}. -%client function -%parse_qos(<<>>, Acc) -> -% lists:reverse(Acc); -%parse_qos(<>, Acc) -> -% parse_qos(Rest, [QoS | Acc]). +parse_will_props(Bin, ProtoVer = ?MQTT_PROTO_V5, 1) -> + parse_properties(ProtoVer, Bin); +parse_will_props(Bin, _ProtoVer, _WillFlag), + {#{}, Bin}. -parse_topics(_, <<>>, Topics) -> +parse_properties(?MQTT_PROTO_V5, Bin) -> + {Len, Rest} = parse_variable_byte_integer(Bin), + < + {#{}, Bin}. %% No properties. + +parse_property(<<>>, Props) -> + Props; +%% 01: 'Payload-Format-Indicator', Byte; +parse_property(<<16#01, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Payload-Format-Indicator' => Val}); +%% 02: 'Message-Expiry-Interval', Four Byte Integer; +parse_property(<<16#02, Val:32/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}); +%% 03: 'Content-Type', UTF-8 Encoded String; +parse_property(<<16#03, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Content-Type' => Val}); +%% 08: 'Response-Topic', UTF-8 Encoded String; +parse_property(<<16#08, Bin/binary>>) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Response-Topic' => Val}); +%% 09: 'Correlation-Data', Binary Data; +parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>) -> + parse_property(Bin, Props#{'Correlation-Data' => Val}); +%% 11: 'Subscription-Identifier', Variable Byte Integer; +parse_property(<<16#0B, Bin/binary>>, Props) -> + {Val, Rest} = parse_variable_byte_integer(Bin), + parse_property(Rest, Props#{'Subscription-Identifier' => Val}); +%% 17: 'Session-Expiry-Interval', Four Byte Integer; +parse_property(<<16#11, Val:32/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}); +%% 18: 'Assigned-Client-Identifier', UTF-8 Encoded String; +parse_property(<<16#12, Bin/binary>>) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}); +%% 19: 'Server-Keep-Alive', Two Byte Integer; +parse_property(<<16#13, Val:16, Bin/binary>>) -> + parse_property(Bin, Props#{'Server-Keep-Alive' => Val}); +%% 21: 'Authentication-Method', UTF-8 Encoded String; +parse_property(<<16#15, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Authentication-Method' => Val}) +%% 22: 'Authentication-Data', Binary Data; +parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>) -> + parse_property(Bin, Props#{'Authentication-Data' => Val}); +%% 23: 'Request-Problem-Information', Byte; +parse_property(<<16#17, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Request-Problem-Information' => Val}); +%% 24: 'Will-Delay-Interval', Four Byte Integer; +parse_property(<<16#18, Val:32, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Will-Delay-Interval' => Val}); +%% 25: 'Request-Response-Information', Byte; +parse_property(<<16#19, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Request-Response-Information' => Val}); +%% 26: 'Response Information', UTF-8 Encoded String; +parse_property(<<16#1A, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Response-Information' => Val}); +%% 28: 'Server-Reference', UTF-8 Encoded String; +parse_property(<<16#1C, Bin/binary>>, Props) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Server-Reference' => Val}); +%% 31: 'Reason-String', UTF-8 Encoded String; +parse_property(<<16#1F, Bin/binary, Props) -> + {Val, Rest} = parse_utf(Bin), + parse_property(Rest, Props#{'Reason-String' => Val}); +%% 33: 'Receive-Maximum', Two Byte Integer; +parse_property(<<16#21, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Receive-Maximum' => Val}); +%% 34: 'Topic-Alias-Maximum', Two Byte Integer; +parse_property(<<16#22, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Topic-Alias-Maximum' => Val}); +%% 35: 'Topic-Alias', Two Byte Integer; +parse_property(<<16#23, Val:16/big, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Topic-Alias' => Val}); +%% 36: 'Maximum-QoS', Byte; +parse_property(<<16#24, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Maximum-QoS' => Val}); +%% 37: 'Retain-Available', Byte; +parse_property(<<16#25, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Retain-Available' => Val}); +%% 38: 'User-Property', UTF-8 String Pair; +parse_property(<<16#26, Bin/binary>>, Props) -> + {Pair, Rest} = parse_utf_pair(Bin), + parse_property(Rest, case maps:find('User-Property', Props) of + {ok, UserProps} -> Props#{'User-Property' := [Pair | UserProps]}; + error -> Props#{'User-Property' := [Pair]} + end); +%% 39: 'Maximum-Packet-Size', Four Byte Integer; +parse_property(<<16#27, Val:32, Bin/binary>>, Props) -> + parse_property(Rest, Props#{'Maximum-Packet-Size' => Val}); +%% 40: 'Wildcard-Subscription-Available', Byte; +parse_property(<<16#28, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Wildcard-Subscription-Available' => Val}); +%% 41: 'Subscription-Identifier-Available', Byte; +parse_property(<<16#29, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Subscription-Identifier-Available' => Val}); +%% 42: 'Shared-Subscription-Available', Byte; +parse_property(<<16#2A, Val, Bin/binary>>, Props) -> + parse_property(Bin, Props#{'Shared-Subscription-Available' => Val}). + +parse_variable_byte_integer(Bin) -> + parse_variable_byte_integer(Bin, 1, 0). +parse_variable_byte_integer(<<1:1, Len:7, Rest/binary>>, Multiplier, Value) -> + parse_variable_byte_integer(Rest, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_variable_byte_integer(<<0:1, Len:7, Rest/binary>>, Multiplier, Value) -> + {Value + Len * Multiplier, Rest}. + +parse_topics(_Packet, <<>>, Topics) -> lists:reverse(Topics); parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> - {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin), - parse_topics(Sub, Rest, [{Name, QoS}| Topics]); + {Name, <<<<_Reserved:2, RetainHandling:2, KeepRetain:1, NoLocal:1, QoS:2>>, Rest/binary>>} = parse_utf(Bin), + SubOpts = [{qos, Qos}, {retain_handling, RetainHandling}, {keep_retain, KeepRetain}, {no_local, NoLocal}], + parse_topics(Sub, Rest, [{Name, SubOpts}| Topics]); parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> {Name, <>} = parse_utf(Bin), parse_topics(Sub, Rest, [Name | Topics]). +parse_utf_pair(Bin) -> + [{Name, Value} || <> <= Bin]. + parse_utf(Bin, 0) -> {undefined, Bin}; parse_utf(Bin, _) -> @@ -229,3 +362,4 @@ fixdup(Header = #mqtt_packet_header{qos = ?QOS0, dup = true}) -> fixdup(Header = #mqtt_packet_header{qos = ?QOS2, dup = true}) -> Header#mqtt_packet_header{dup = false}; fixdup(Header) -> Header. + diff --git a/src/emqx_plugins.erl b/src/emqx_plugins.erl index a9b3c5c25..4eddf7331 100644 --- a/src/emqx_plugins.erl +++ b/src/emqx_plugins.erl @@ -157,16 +157,16 @@ stop_plugins(Names) -> [stop_app(App) || App <- Names]. %% @doc List all available plugins --spec(list() -> [mqtt_plugin()]). +-spec(list() -> [plugin()]). list() -> case emqx:env(plugins_etc_dir) of {ok, PluginsEtc} -> CfgFiles = filelib:wildcard("*.{conf,config}", PluginsEtc) ++ get_expand_plugin_config(), Plugins = [plugin(CfgFile) || CfgFile <- CfgFiles], StartedApps = names(started_app), - lists:map(fun(Plugin = #mqtt_plugin{name = Name}) -> + lists:map(fun(Plugin = #plugin{name = Name}) -> case lists:member(Name, StartedApps) of - true -> Plugin#mqtt_plugin{active = true}; + true -> Plugin#plugin{active = true}; false -> Plugin end end, Plugins); @@ -179,7 +179,7 @@ plugin(CfgFile) -> {ok, Attrs} = application:get_all_key(AppName), Ver = proplists:get_value(vsn, Attrs, "0"), Descr = proplists:get_value(description, Attrs, ""), - #mqtt_plugin{name = AppName, version = Ver, descr = Descr}. + #plugin{name = AppName, version = Ver, descr = Descr}. %% @doc Load a Plugin -spec(load(atom()) -> ok | {error, term()}). @@ -198,7 +198,7 @@ load(PluginName) when is_atom(PluginName) -> end end. -load_plugin(#mqtt_plugin{name = Name}, Persistent) -> +load_plugin(#plugin{name = Name}, Persistent) -> case load_app(Name) of ok -> start_app(Name, fun(App) -> plugin_loaded(App, Persistent) end); @@ -280,7 +280,7 @@ names(started_app) -> [Name || {Name, _Descr, _Ver} <- application:which_applications()]; names(Plugins) -> - [Name || #mqtt_plugin{name = Name} <- Plugins]. + [Name || #plugin{name = Name} <- Plugins]. plugin_loaded(_Name, false) -> ok; diff --git a/src/emqx_pool_sup.erl b/src/emqx_pool_sup.erl index 8cdc31910..45af0b81b 100644 --- a/src/emqx_pool_sup.erl +++ b/src/emqx_pool_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Common Pool Supervisor -module(emqx_pool_sup). --author("Feng Lee "). - -behaviour(supervisor). %% API diff --git a/src/emqx_pooler.erl b/src/emqx_pooler.erl index 499c096b8..3cae2264d 100644 --- a/src/emqx_pooler.erl +++ b/src/emqx_pooler.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,8 +20,6 @@ -behaviour(gen_server). --include("emqx_internal.hrl"). - %% Start the pool supervisor -export([start_link/0]). @@ -44,7 +42,9 @@ start_link() -> -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> - gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). + gen_server:start_link({local, name(Id)}, ?MODULE, [Pool, Id], []). + +name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). %% @doc Submit work to pooler submit(Fun) -> gen_server:call(worker(), {submit, Fun}, infinity). @@ -61,20 +61,17 @@ worker() -> %%-------------------------------------------------------------------- init([Pool, Id]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{pool = Pool, id = Id}}. handle_call({submit, Fun}, _From, State) -> - {reply, run(Fun), State}; + {reply, catch run(Fun), State}; handle_call(_Req, _From, State) -> {reply, ok, State}. handle_cast({async_submit, Fun}, State) -> - try run(Fun) - catch _:Error -> - lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()]) - end, + try run(Fun) catch _:Error -> lager:error("Pooler Error: ~p, ~p", [Error, erlang:get_stacktrace()]) end, {noreply, State}; handle_cast(_Msg, State) -> @@ -84,7 +81,7 @@ handle_info(_Info, State) -> {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id), ok. + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_serializer.erl b/src/emqx_serializer.erl index 977b934ce..f99497d84 100644 --- a/src/emqx_serializer.erl +++ b/src/emqx_serializer.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Packet Serializer -module(emqx_serializer). -author("Feng Lee "). @@ -81,23 +80,28 @@ serialize_variable(?CONNECT, #mqtt_packet_connect{client_id = ClientId, {VariableBin, <>}; serialize_variable(?CONNACK, #mqtt_packet_connack{ack_flags = AckFlags, - return_code = ReturnCode}, undefined) -> - {<>, <<>>}; + reason_code = ReasonCode, + properties = Properties}, undefined) -> + PropsBin = serialize_properties(Properties), + {<>, <<>>}; serialize_variable(?SUBSCRIBE, #mqtt_packet_subscribe{packet_id = PacketId, topic_table = Topics }, undefined) -> {<>, serialize_topics(Topics)}; serialize_variable(?SUBACK, #mqtt_packet_suback{packet_id = PacketId, - qos_table = QosTable}, undefined) -> - {<>, << <> || Q <- QosTable >>}; + properties = Properties, + reason_codes = ReasonCodes}, undefined) -> + {<>, << <> || Code <- ReasonCodes >>}; serialize_variable(?UNSUBSCRIBE, #mqtt_packet_unsubscribe{packet_id = PacketId, topics = Topics }, undefined) -> {<>, serialize_topics(Topics)}; -serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId}, undefined) -> - {<>, <<>>}; +serialize_variable(?UNSUBACK, #mqtt_packet_unsuback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}, undefined) -> + {<>, << <> || Code <- ReasonCodes >>}; serialize_variable(?PUBLISH, #mqtt_packet_publish{topic_name = TopicName, packet_id = PacketId }, PayloadBin) -> @@ -118,33 +122,132 @@ serialize_variable(?PINGREQ, undefined, undefined) -> serialize_variable(?PINGRESP, undefined, undefined) -> {<<>>, <<>>}; -serialize_variable(?DISCONNECT, undefined, undefined) -> - {<<>>, <<>>}. +serialize_variable(?DISCONNECT, #mqtt_packet_disconnect{reason_code = ReasonCode, + properties = Properties}, undefined) -> + {<>, <<>>}. + +serialize_variable(?AUTH, #mqtt_packet_auth{reason_code = ReasonCode, + properties = Properties}, undefined) -> + {<>, <<>>}. serialize_payload(undefined) -> undefined; serialize_payload(Bin) when is_binary(Bin) -> Bin. +serialize_properties(undefined) -> + <<>>; +serialize_properties(Props) -> + << serialize_property(Prop, Val) || {Prop, Val} <= (maps:to_list(Props)) >>. + +%% 01: Byte; +serialize_property('Payload-Format-Indicator', Val) -> + <<16#01, Val>>; +%% 02: Four Byte Integer; +serialize_property('Message-Expiry-Interval', Val) -> + <<16#02, Val:32/big>>; +%% 03: UTF-8 Encoded String; +serialize_property('Content-Type', Val) -> + <<16#03, (serialize_utf(Val))/binary>>; +%% 08: UTF-8 Encoded String; +serialize_property('Response-Topic', Val) -> + <<16#08, (serialize_utf(Val))/binary>>; +%% 09: Binary Data; +serialize_property('Correlation-Data', Val) -> + <<16#09, (iolist_size(Val)):16, Val/binary>>; +%% 11: Variable Byte Integer; +serialize_property('Subscription-Identifier', Val) -> + <<16#0B, (serialize_variable_byte_integer(Val))/binary>>; +%% 17: Four Byte Integer; +serialize_property('Session-Expiry-Interval', Val) -> + <<16#11, Val:32/big>>; +%% 18: UTF-8 Encoded String; +serialize_property('Assigned-Client-Identifier', Val) -> + <<16#12, (serialize_utf(Val))/binary>>; +%% 19: Two Byte Integer; +serialize_property('Server-Keep-Alive', Val) -> + <<16#13, Val:16/big>>; +%% 21: UTF-8 Encoded String; +serialize_property('Authentication-Method', Val) -> + <<16#15, (serialize_utf(Val))/binary>>; +%% 22: Binary Data; +serialize_property('Authentication-Data', Val) -> + <<16#16, (iolist_size(Val)):16, Val/binary>>; +%% 23: Byte; +serialize_property('Request-Problem-Information', Val) -> + <<16#17, Val>>; +%% 24: Four Byte Integer; +serialize_property('Will-Delay-Interval', Val) -> + <<16#18, Val:32/big>>; +%% 25: Byte; +serialize_property('Request-Response-Information', Val) -> + <<16#19, Val>>; +%% 26: UTF-8 Encoded String; +serialize_property('Response-Information', Val) -> + <<16#1A, (serialize_utf(Val))/binary>>; +%% 28: UTF-8 Encoded String; +serialize_property('Server-Reference', Val) -> + <<16#1C, (serialize_utf(Val))/binary>>; +%% 31: UTF-8 Encoded String; +serialize_property('Reason-String', Val) -> + <<16#1F, (serialize_utf(Val))/binary>>; +%% 33: Two Byte Integer; +serialize_property('Receive-Maximum', Val) -> + <<16#21, Val:16/big>>; +%% 34: Two Byte Integer; +serialize_property('Topic-Alias-Maximum', Val) -> + <<16#22, Val:16/big>>; +%% 35: Two Byte Integer; +serialize_property('Topic-Alias', Val) -> + <<16#23, Val:16/big>>; +%% 36: Byte; +serialize_property('Maximum-QoS', Val) -> + <<16#24, Val>>; +%% 37: Byte; +serialize_property('Retain-Available', Val) -> + <<16#25, Val>>; +%% 38: UTF-8 String Pair; +serialize_property('User-Property', Val) -> + <<16#26, (serialize_utf_pair(Val))/binary>>; +%% 39: Four Byte Integer; +serialize_property('Maximum-Packet-Size', Val) -> + <<16#27, Val:32/big>>; +%% 40: Byte; +serialize_property('Wildcard-Subscription-Available', Val) -> + <<16#28, Val>>; +%% 41: Byte; +serialize_property('Subscription-Identifier-Available', Val) -> + <<16#29, Val>>; +%% 42: Byte; +serialize_property('Shared-Subscription-Available', Val) -> + <<16#2A, Val>>. + serialize_topics([{_Topic, _Qos}|_] = Topics) -> << <<(serialize_utf(Topic))/binary, ?RESERVED:6, Qos:2>> || {Topic, Qos} <- Topics >>; serialize_topics([H|_] = Topics) when is_binary(H) -> << <<(serialize_utf(Topic))/binary>> || Topic <- Topics >>. +serialize_utf_pair({Name, Value}) -> + << <<(serialize_utf(S))/binary, (serialize_utf(S))/binary>> || S <- [Name, Value] >>. + serialize_utf(String) -> StringBin = unicode:characters_to_binary(String), Len = byte_size(StringBin), true = (Len =< 16#ffff), <>. -serialize_len(N) when N =< ?LOWBITS -> +serialize_len(I) -> + serialize_variable_byte_integer(I). %%TODO: refactor later. + +serialize_variable_byte_integer(N) when N =< ?LOWBITS -> <<0:1, N:7>>; -serialize_len(N) -> - <<1:1, (N rem ?HIGHBIT):7, (serialize_len(N div ?HIGHBIT))/binary>>. +serialize_variable_byte_integer(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialize_variable_byte_integer(N div ?HIGHBIT))/binary>>. opt(undefined) -> ?RESERVED; opt(false) -> 0; opt(true) -> 1; opt(X) when is_integer(X) -> X; opt(B) when is_binary(B) -> 1. + diff --git a/src/emqx_stats.erl b/src/emqx_stats.erl index fe89955ee..7146beb96 100644 --- a/src/emqx_stats.erl +++ b/src/emqx_stats.erl @@ -18,12 +18,13 @@ -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). -export([start_link/0, stop/0]). +%% Get all Stats +-export([all/0]). + %% Client and Session Stats -export([set_client_stats/2, get_client_stats/1, del_client_stats/1, set_session_stats/2, get_session_stats/1, del_session_stats/1]). @@ -115,6 +116,8 @@ get_session_stats(ClientId) -> del_session_stats(ClientId) -> ets:delete(?SESSION_STATS_TAB, ClientId). +all() -> ets:tab2list(?STATS_TAB). + %% @doc Generate stats fun -spec(statsfun(Stat :: atom()) -> fun()). statsfun(Stat) -> diff --git a/src/emqx_sysmon.erl b/src/emqx_sysmon.erl index c76f78dcd..7f4a2490d 100644 --- a/src/emqx_sysmon.erl +++ b/src/emqx_sysmon.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,12 +16,8 @@ -module(emqx_sysmon). --author("Feng Lee "). - -behavior(gen_server). --include("emqx_internal.hrl"). - -export([start_link/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -29,22 +25,21 @@ -record(state, {tickref, events = [], tracelog}). --define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]). +%%-define(LOG_FMT, [{formatter_config, [time, " ", message, "\n"]}]). -define(LOG(Msg, ProcInfo), - lager:warning([{sysmon, true}], "~s~n~p", [WarnMsg, ProcInfo])). + lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). -define(LOG(Msg, ProcInfo, PortInfo), - lager:warning([{sysmon, true}], "~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). + lager:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). %% @doc Start system monitor --spec(start_link(Opts :: list(tuple())) -> - {ok, pid()} | ignore | {error, term()}). +-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}). start_link(Opts) -> gen_server:start_link({local, ?MODULE}, ?MODULE, [Opts], []). %%-------------------------------------------------------------------- -%% gen_server callbacks +%% gen_server Callbacks %%-------------------------------------------------------------------- init([Opts]) -> @@ -80,10 +75,12 @@ parse_opt([_Opt|Opts], Acc) -> parse_opt(Opts, Acc). handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[SYSMON] Unexpected Call: ~p", [Req]), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[SYSMON] Unexpected Cast: ~p", [Msg]), + {noreply, State}. handle_info({monitor, Pid, long_gc, Info}, State) -> suppress({long_gc, Pid}, fun() -> @@ -131,7 +128,8 @@ handle_info(reset, State) -> {noreply, State#state{events = []}, hibernate}; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[SYSMON] Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, #state{tickref = TRef, tracelog = TraceLog}) -> timer:cancel(TRef), diff --git a/src/emqx_trace.erl b/src/emqx_trace.erl index 81da9989c..2cc4cd29e 100644 --- a/src/emqx_trace.erl +++ b/src/emqx_trace.erl @@ -20,8 +20,6 @@ -author("Feng Lee "). --include("emqx_internal.hrl"). - %% API Function Exports -export([start_link/0]). @@ -100,13 +98,16 @@ handle_call(all_traces, _From, State = #state{traces = Traces}) -> <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[TRACE] Unexpected Call: ~p", [Req]), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[TRACE] Unexpected Cast: ~p", [Msg]), + {noreply, State}. handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[TRACE] Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, _State) -> ok.