From d7eae533ac53815dd5c34ad109d2cb457f7923a1 Mon Sep 17 00:00:00 2001 From: Feng Lee Date: Wed, 28 Feb 2018 21:22:39 +0800 Subject: [PATCH] Merge the emqx-mqtt5 library --- include/emqx.hrl | 30 ++++- include/emqx_cli.hrl | 24 ---- include/emqx_internal.hrl | 7 - include/{emqx_common.hrl => emqx_misc.hrl} | 0 include/emqx_mqtt.hrl | 109 +++++++++++---- include/emqx_rest.hrl | 17 --- include/emqx_trie.hrl | 35 ----- src/emqx.erl | 2 +- src/emqx_access_control.erl | 2 +- src/emqx_bridge.erl | 5 +- src/emqx_cm.erl | 19 ++- src/emqx_cm_sup.erl | 12 +- src/{emqx_client.erl => emqx_conn.erl} | 124 +++++++++--------- src/emqx_misc.erl | 2 +- src/emqx_modules.erl | 2 +- src/emqx_pooler.erl | 2 - src/emqx_protocol.erl | 8 +- src/emqx_pubsub.erl | 9 +- src/emqx_server.erl | 7 +- src/emqx_session.erl | 14 +- src/emqx_session_sup.erl | 3 - src/emqx_sm.erl | 20 +-- src/emqx_sm_helper.erl | 12 +- src/emqx_sm_sup.erl | 7 +- src/emqx_trie.erl | 10 +- src/emqx_ws.erl | 2 +- src/{emqx_ws_client.erl => emqx_ws_conn.erl} | 10 +- ...ws_client_sup.erl => emqx_ws_conn_sup.erl} | 20 ++- test/emqx_trie_SUITE.erl | 6 +- 29 files changed, 233 insertions(+), 287 deletions(-) delete mode 100644 include/emqx_cli.hrl rename include/{emqx_common.hrl => emqx_misc.hrl} (100%) delete mode 100644 include/emqx_rest.hrl delete mode 100644 include/emqx_trie.hrl rename src/{emqx_client.erl => emqx_conn.erl} (71%) rename src/{emqx_ws_client.erl => emqx_ws_conn.erl} (98%) rename src/{emqx_ws_client_sup.erl => emqx_ws_conn_sup.erl} (71%) diff --git a/include/emqx.hrl b/include/emqx.hrl index ccfaad6d0..72fb71971 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -48,13 +48,6 @@ %% MQTT Topic %%-------------------------------------------------------------------- --record(mqtt_topic, - { topic :: binary(), - flags = [] :: [retained | static] - }). - --type(mqtt_topic() :: #mqtt_topic{}). - %%-------------------------------------------------------------------- %% MQTT Subscription %%-------------------------------------------------------------------- @@ -163,6 +156,29 @@ -type(route() :: #route{}). +%%-------------------------------------------------------------------- +%% Trie +%%-------------------------------------------------------------------- + +-type(trie_node_id() :: binary() | atom()). + +-record(trie_node, + { node_id :: trie_node_id(), + edge_count = 0 :: non_neg_integer(), + topic :: binary() | undefined, + flags :: list(atom()) + }). + +-record(trie_edge, + { node_id :: trie_node_id(), + word :: binary() | atom() + }). + +-record(trie, + { edge :: #trie_edge{}, + node_id :: trie_node_id() + }). + %%-------------------------------------------------------------------- %% Alarm %%-------------------------------------------------------------------- diff --git a/include/emqx_cli.hrl b/include/emqx_cli.hrl deleted file mode 100644 index b99038481..000000000 --- a/include/emqx_cli.hrl +++ /dev/null @@ -1,24 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --define(PRINT_MSG(Msg), io:format(Msg)). - --define(PRINT(Format, Args), io:format(Format, Args)). - --define(PRINT_CMD(Cmd, Descr), io:format("~-48s# ~s~n", [Cmd, Descr])). - --define(USAGE(CmdList), [?PRINT_CMD(Cmd, Descr) || {Cmd, Descr} <- CmdList]). - diff --git a/include/emqx_internal.hrl b/include/emqx_internal.hrl index 44138cefd..2241c2fc8 100644 --- a/include/emqx_internal.hrl +++ b/include/emqx_internal.hrl @@ -26,13 +26,6 @@ -define(PROC_NAME(M, I), (list_to_atom(lists:concat([M, "_", I])))). --define(record_to_proplist(Def, Rec), - lists:zip(record_info(fields, Def), tl(tuple_to_list(Rec)))). - --define(record_to_proplist(Def, Rec, Fields), - [{K, V} || {K, V} <- ?record_to_proplist(Def, Rec), - lists:member(K, Fields)]). - -define(UNEXPECTED_REQ(Req, State), (begin lager:error("[~s] Unexpected Request: ~p", [?MODULE, Req]), diff --git a/include/emqx_common.hrl b/include/emqx_misc.hrl similarity index 100% rename from include/emqx_common.hrl rename to include/emqx_misc.hrl diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 8a0ad4478..a4946cc16 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -115,7 +115,7 @@ 'DISCONNECT', 'AUTH']). --type(mqtt_packet_type() :: ?RESERVED..?DISCONNECT). +-type(mqtt_packet_type() :: ?RESERVED..?AUTH). %%-------------------------------------------------------------------- %% MQTT Connect Return Codes @@ -158,10 +158,23 @@ %% MQTT Packets %%-------------------------------------------------------------------- +-type(mqtt_topic() :: binary()). + -type(mqtt_client_id() :: binary()). + -type(mqtt_username() :: binary() | undefined). + -type(mqtt_packet_id() :: 1..16#ffff | undefined). +-type(mqtt_reason_code() :: 1..16#ff | undefined). + +-type(mqtt_properties() :: undefined | map()). + +-type(mqtt_subopt() :: list({qos, mqtt_qos()} + | {retain_handling, boolean()} + | {keep_retain, boolean()} + | {no_local, boolean()})). + -record(mqtt_packet_connect, { client_id = <<>> :: mqtt_client_id(), proto_ver = ?MQTT_PROTO_V4 :: mqtt_vsn(), @@ -170,44 +183,68 @@ will_qos = ?QOS_1 :: mqtt_qos(), will_flag = false :: boolean(), clean_sess = false :: boolean(), + clean_start = true :: boolean(), keep_alive = 60 :: non_neg_integer(), + will_props = undefined :: undefined | map(), will_topic = undefined :: undefined | binary(), will_msg = undefined :: undefined | binary(), username = undefined :: undefined | binary(), password = undefined :: undefined | binary(), - is_bridge = false :: boolean() + is_bridge = false :: boolean(), + properties = undefined :: mqtt_properties() %% MQTT Version 5.0 }). -record(mqtt_packet_connack, { ack_flags = ?RESERVED :: 0 | 1, - return_code :: mqtt_connack() + reason_code :: mqtt_connack(), + properties :: map() }). -record(mqtt_packet_publish, { topic_name :: binary(), - packet_id :: mqtt_packet_id() + packet_id :: mqtt_packet_id(), + properties :: mqtt_properties() }). -record(mqtt_packet_puback, - { packet_id :: mqtt_packet_id() }). + { packet_id :: mqtt_packet_id(), + reason_code :: mqtt_reason_code(), + properties :: mqtt_properties() + }). -record(mqtt_packet_subscribe, - { packet_id :: mqtt_packet_id(), - topic_table :: list({binary(), mqtt_qos()}) + { packet_id :: mqtt_packet_id(), + properties :: mqtt_properties(), + topic_filters :: list({binary(), mqtt_subopt()}) }). -record(mqtt_packet_unsubscribe, - { packet_id :: mqtt_packet_id(), - topics :: list(binary()) + { packet_id :: mqtt_packet_id(), + properties :: mqtt_properties(), + topics :: list(binary()) }). -record(mqtt_packet_suback, - { packet_id :: mqtt_packet_id(), - qos_table :: list(mqtt_qos() | 128) + { packet_id :: mqtt_packet_id(), + properties :: mqtt_properties(), + reason_codes :: list(mqtt_reason_code()) }). -record(mqtt_packet_unsuback, - { packet_id :: mqtt_packet_id() }). + { packet_id :: mqtt_packet_id(), + properties :: mqtt_properties(), + reason_codes :: list(mqtt_reason_code()) + }). + +-record(mqtt_packet_disconnect, + { reason_code :: mqtt_reason_code(), + properties :: mqtt_properties() + }). + +-record(mqtt_packet_auth, + { reason_code :: mqtt_reason_code(), + properties :: mqtt_properties() + }). %%-------------------------------------------------------------------- %% MQTT Control Packet @@ -215,11 +252,18 @@ -record(mqtt_packet, { header :: #mqtt_packet_header{}, - variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} - | #mqtt_packet_publish{} | #mqtt_packet_puback{} - | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} - | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} - | mqtt_packet_id() | undefined, + variable :: #mqtt_packet_connect{} + | #mqtt_packet_connack{} + | #mqtt_packet_publish{} + | #mqtt_packet_puback{} + | #mqtt_packet_subscribe{} + | #mqtt_packet_suback{} + | #mqtt_packet_unsubscribe{} + | #mqtt_packet_unsuback{} + | #mqtt_packet_disconnect{} + | #mqtt_packet_auth{} + | mqtt_packet_id() + | undefined, payload :: binary() | undefined }). @@ -232,14 +276,20 @@ -define(CONNECT_PACKET(Var), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}, variable = Var}). --define(CONNACK_PACKET(ReturnCode), +-define(CONNACK_PACKET(ReasonCode), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, - variable = #mqtt_packet_connack{return_code = ReturnCode}}). + variable = #mqtt_packet_connack{reason_code = ReturnCode}}). --define(CONNACK_PACKET(ReturnCode, SessPresent), +-define(CONNACK_PACKET(ReasonCode, SessPresent), #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, variable = #mqtt_packet_connack{ack_flags = SessPresent, - return_code = ReturnCode}}). + reason_code = ReturnCode}}). + +-define(CONNACK_PACKET(ReasonCode, SessPresent, Properties), + #mqtt_packet{header = #mqtt_packet_header{type = ?CONNACK}, + variable = #mqtt_packet_connack{ack_flags = SessPresent, + reason_code = ReasonCode, + properties = Properties}}). -define(PUBLISH_PACKET(Qos, PacketId), #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, @@ -265,11 +315,18 @@ #mqtt_packet{header = #mqtt_packet_header{type = ?SUBSCRIBE, qos = ?QOS_1}, variable = #mqtt_packet_subscribe{packet_id = PacketId, topic_table = TopicTable}}). --define(SUBACK_PACKET(PacketId, QosTable), + +-define(SUBACK_PACKET(PacketId, ReasonCodes), #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, - variable = #mqtt_packet_suback{packet_id = PacketId, - qos_table = QosTable}}). --define(UNSUBSCRIBE_PACKET(PacketId, Topics), + variable = #mqtt_packet_suback{packet_id = PacketId, + reason_codes = ReasonCodes}}). + +-define(SUBACK_PACKET(PacketId, Properties, ReasonCodes), + #mqtt_packet{header = #mqtt_packet_header{type = ?SUBACK}, + variable = #mqtt_packet_suback{packet_id = PacketId, + properties = Properties, + reason_codes = ReasonCodes}}). +-define(UNSUBSCRIBE_PACKET(PacketId, Topics), #mqtt_packet{header = #mqtt_packet_header{type = ?UNSUBSCRIBE, qos = ?QOS_1}, variable = #mqtt_packet_unsubscribe{packet_id = PacketId, topics = Topics}}). diff --git a/include/emqx_rest.hrl b/include/emqx_rest.hrl deleted file mode 100644 index c1c6b219d..000000000 --- a/include/emqx_rest.hrl +++ /dev/null @@ -1,17 +0,0 @@ - --define(SUCCESS, 0). %% Success --define(ERROR1, 101). %% badrpc --define(ERROR2, 102). %% Unknown error --define(ERROR3, 103). %% Username or password error --define(ERROR4, 104). %% Empty username or password --define(ERROR5, 105). %% User does not exist --define(ERROR6, 106). %% Admin can not be deleted --define(ERROR7, 107). %% Missing request parameter --define(ERROR8, 108). %% Request parameter type error --define(ERROR9, 109). %% Request parameter is not a json --define(ERROR10, 110). %% Plugin has been loaded --define(ERROR11, 111). %% Plugin has been loaded --define(ERROR12, 112). %% Client not online --define(ERROR13, 113). %% User already exist --define(ERROR14, 114). %% OldPassword error - diff --git a/include/emqx_trie.hrl b/include/emqx_trie.hrl deleted file mode 100644 index ce57b4438..000000000 --- a/include/emqx_trie.hrl +++ /dev/null @@ -1,35 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --type(trie_node_id() :: binary() | atom()). - --record(trie_node, - { node_id :: trie_node_id(), - edge_count = 0 :: non_neg_integer(), - topic :: binary() | undefined, - flags :: list(atom()) - }). - --record(trie_edge, - { node_id :: trie_node_id(), - word :: binary() | atom() - }). - --record(trie, - { edge :: #trie_edge{}, - node_id :: trie_node_id() - }). - diff --git a/src/emqx.erl b/src/emqx.erl index 07e133cf7..6142cfa7d 100644 --- a/src/emqx.erl +++ b/src/emqx.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_access_control.erl b/src/emqx_access_control.erl index 2c6460acc..409b98065 100644 --- a/src/emqx_access_control.erl +++ b/src/emqx_access_control.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index fc6f33535..e9858793f 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -66,7 +66,7 @@ start_link(Pool, Id, Node, Topic, Options) -> %%-------------------------------------------------------------------- init([Pool, Id, Node, Topic, Options]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), process_flag(trap_exit, true), case net_kernel:connect_node(Node) of true -> @@ -151,8 +151,7 @@ handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id), - ok. + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_cm.erl b/src/emqx_cm.erl index f0b502170..4956b199d 100644 --- a/src/emqx_cm.erl +++ b/src/emqx_cm.erl @@ -14,18 +14,12 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Client Manager - -module(emqx_cm). -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). --include("emqx_internal.hrl"). - %% API Exports -export([start_link/3]). @@ -78,7 +72,7 @@ pick(ClientId) -> gproc_pool:pick_worker(?POOL, ClientId). %%-------------------------------------------------------------------- init([Pool, Id, StatsFun]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{pool = Pool, id = Id, statsfun = StatsFun, monitors = dict:new()}}. handle_call({reg, Client = #mqtt_client{client_id = ClientId, @@ -92,7 +86,8 @@ handle_call({reg, Client = #mqtt_client{client_id = ClientId, end; handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[MQTT-CM] Unexpected Call: ~p", [Req]), + {reply, ignore, State}. handle_cast({unreg, ClientId, Pid}, State) -> case lookup_proc(ClientId) of @@ -104,7 +99,8 @@ handle_cast({unreg, ClientId, Pid}, State) -> end; handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[MQTT-CM] Unexpected Cast: ~p", [Msg]), + {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of @@ -123,10 +119,11 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> end; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[CM] Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id), ok. + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_cm_sup.erl b/src/emqx_cm_sup.erl index f28468d39..229e474bb 100644 --- a/src/emqx_cm_sup.erl +++ b/src/emqx_cm_sup.erl @@ -14,24 +14,16 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Client Manager Supervisor. - -module(emqx_cm_sup). -behaviour(supervisor). --author("Feng Lee "). - --include("emqx.hrl"). - %% API -export([start_link/0]). %% Supervisor callbacks -export([init/1]). --define(CM, emqx_cm). - -define(TAB, mqtt_client). start_link() -> @@ -42,8 +34,8 @@ init([]) -> create_client_tab(), %% CM Pool Sup - MFA = {?CM, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]}, - PoolSup = emqx_pool_sup:spec([?CM, hash, erlang:system_info(schedulers), MFA]), + MFA = {emqx_cm, start_link, [emqx_stats:statsfun('clients/count', 'clients/max')]}, + PoolSup = emqx_pool_sup:spec([emqx_cm, hash, erlang:system_info(schedulers), MFA]), {ok, {{one_for_all, 10, 3600}, [PoolSup]}}. diff --git a/src/emqx_client.erl b/src/emqx_conn.erl similarity index 71% rename from src/emqx_client.erl rename to src/emqx_conn.erl index 0a3d7d8ce..a731bacc9 100644 --- a/src/emqx_client.erl +++ b/src/emqx_conn.erl @@ -14,9 +14,7 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT/TCP Connection. - --module(emqx_client). +-module(emqx_conn). -behaviour(gen_server). @@ -24,7 +22,7 @@ -include("emqx_mqtt.hrl"). --include("emqx_internal.hrl"). +-include("emqx_misc.hrl"). -import(proplists, [get_value/2, get_value/3]). @@ -49,11 +47,10 @@ %% TODO: How to emit stats? -export([handle_pre_hibernate/1]). -%% Client State %% Unused fields: connname, peerhost, peerport --record(client_state, {connection, peername, conn_state, await_recv, - rate_limit, packet_size, parser, proto_state, - keepalive, enable_stats, idle_timeout, force_gc_count}). +-record(state, {connection, peername, conn_state, await_recv, + rate_limit, packet_size, parser, proto_state, + keepalive, enable_stats, idle_timeout, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). @@ -61,7 +58,7 @@ -define(LOG(Level, Format, Args, State), lager:Level("Client(~s): " ++ Format, - [esockd_net:format(State#client_state.peername) | Args])). + [esockd_net:format(State#state.peername) | Args])). start_link(Conn, Env) -> {ok, proc_lib:spawn_link(?MODULE, init, [[Conn, Env]])}. @@ -117,17 +114,17 @@ do_init(Conn, Env, Peername) -> EnableStats = get_value(client_enable_stats, Env, false), IdleTimout = get_value(client_idle_timeout, Env, 30000), ForceGcCount = emqx_gc:conn_max_gc_count(), - State = run_socket(#client_state{connection = Conn, - peername = Peername, - await_recv = false, - conn_state = running, - rate_limit = RateLimit, - packet_size = PacketSize, - parser = Parser, - proto_state = ProtoState, - enable_stats = EnableStats, - idle_timeout = IdleTimout, - force_gc_count = ForceGcCount}), + State = run_socket(#state{connection = Conn, + peername = Peername, + await_recv = false, + conn_state = running, + rate_limit = RateLimit, + packet_size = PacketSize, + parser = Parser, + proto_state = ProtoState, + enable_stats = EnableStats, + idle_timeout = IdleTimout, + force_gc_count = ForceGcCount}), gen_server:enter_loop(?MODULE, [{hibernate_after, 10000}], State, self(), IdleTimout). @@ -135,7 +132,7 @@ send_fun(Conn, Peername) -> Self = self(), fun(Packet) -> Data = emqx_serializer:serialize(Packet), - ?LOG(debug, "SEND ~p", [Data], #client_state{peername = Peername}), + ?LOG(debug, "SEND ~p", [Data], #state{peername = Peername}), emqx_metrics:inc('bytes/sent', iolist_size(Data)), try Conn:async_send(Data) of ok -> ok; @@ -147,15 +144,15 @@ send_fun(Conn, Peername) -> end. handle_pre_hibernate(State) -> - {hibernate, emqx_gc:reset_conn_gc_count(#client_state.force_gc_count, emit_stats(State))}. + {hibernate, emqx_gc:reset_conn_gc_count(#state.force_gc_count, emit_stats(State))}. -handle_call(info, From, State = #client_state{proto_state = ProtoState}) -> +handle_call(info, From, State = #state{proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), - ClientInfo = ?record_to_proplist(client_state, State, ?INFO_KEYS), + ClientInfo = ?record_to_proplist(state, State, ?INFO_KEYS), {reply, Stats, _, _} = handle_call(stats, From, State), reply(lists:append([ClientInfo, ProtoInfo, Stats]), State); -handle_call(stats, _From, State = #client_state{proto_state = ProtoState}) -> +handle_call(stats, _From, State = #state{proto_state = ProtoState}) -> reply(lists:append([emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState), sock_stats(State)]), State); @@ -164,12 +161,12 @@ handle_call(kick, _From, State) -> {stop, {shutdown, kick}, ok, State}; handle_call({set_rate_limit, Rl}, _From, State) -> - reply(ok, State#client_state{rate_limit = Rl}); + reply(ok, State#state{rate_limit = Rl}); -handle_call(get_rate_limit, _From, State = #client_state{rate_limit = Rl}) -> +handle_call(get_rate_limit, _From, State = #state{rate_limit = Rl}) -> reply(Rl, State); -handle_call(session, _From, State = #client_state{proto_state = ProtoState}) -> +handle_call(session, _From, State = #state{proto_state = ProtoState}) -> reply(emqx_protocol:session(ProtoState), State); handle_call({clean_acl_cache, Topic}, _From, State) -> @@ -177,10 +174,12 @@ handle_call({clean_acl_cache, Topic}, _From, State) -> reply(ok, State); handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + ?LOG(error, "Unexpected Call: ~p", [Req], State), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + ?LOG(error, "Unexpected Cast: ~p", [Msg], State), + {noreply, State}. handle_info({subscribe, TopicTable}, State) -> with_proto( @@ -204,7 +203,7 @@ handle_info({suback, PacketId, GrantedQos}, State) -> %% Fastlane handle_info({dispatch, _Topic, Message}, State) -> - handle_info({deliver, Message#mqtt_message{qos = ?QOS_0}}, State); + handle_info({deliver, Message#message{qos = ?QOS_0}}, State); handle_info({deliver, Message}, State) -> with_proto( @@ -233,13 +232,13 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> shutdown(conflict, State); handle_info(activate_sock, State) -> - {noreply, run_socket(State#client_state{conn_state = running})}; + {noreply, run_socket(State#state{conn_state = running})}; handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> Size = iolist_size(Data), ?LOG(debug, "RECV ~p", [Data], State), emqx_metrics:inc('bytes/received', Size), - received(Data, rate_limit(Size, State#client_state{await_recv = false})); + received(Data, rate_limit(Size, State#state{await_recv = false})); handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) -> shutdown(Reason, State); @@ -250,7 +249,7 @@ handle_info({inet_reply, _Sock, ok}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) -> shutdown(Reason, State); -handle_info({keepalive, start, Interval}, State = #client_state{connection = Conn}) -> +handle_info({keepalive, start, Interval}, State = #state{connection = Conn}) -> ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), StatFun = fun() -> case Conn:getstat([recv_oct]) of @@ -279,11 +278,12 @@ handle_info({keepalive, check}, State = #client_state{keepalive = KeepAlive}) -> end; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + ?LOG(error, "Unexpected Info: ~p", [Info], State), + {noreply, State}. -terminate(Reason, State = #client_state{connection = Conn, - keepalive = KeepAlive, - proto_state = ProtoState}) -> +terminate(Reason, State = #state{connection = Conn, + keepalive = KeepAlive, + proto_state = ProtoState}) -> ?LOG(debug, "Terminated for ~p", [Reason], State), Conn:fast_close(), @@ -308,26 +308,26 @@ code_change(_OldVsn, State, _Extra) -> received(<<>>, State) -> {noreply, gc(State)}; -received(Bytes, State = #client_state{parser = Parser, - packet_size = PacketSize, - proto_state = ProtoState, - idle_timeout = IdleTimeout}) -> +received(Bytes, State = #state{parser = Parser, + packet_size = PacketSize, + proto_state = ProtoState, + idle_timeout = IdleTimeout}) -> case catch emqx_parser:parse(Bytes, Parser) of {more, NewParser} -> - {noreply, run_socket(State#client_state{parser = NewParser}), IdleTimeout}; + {noreply, run_socket(State#state{parser = NewParser}), IdleTimeout}; {ok, Packet, Rest} -> emqx_metrics:received(Packet), case emqx_protocol:received(Packet, ProtoState) of {ok, ProtoState1} -> - received(Rest, State#client_state{parser = emqx_parser:initial_state(PacketSize), - proto_state = ProtoState1}); + received(Rest, State#state{parser = emqx_parser:initial_state(PacketSize), + proto_state = ProtoState1}); {error, Error} -> ?LOG(error, "Protocol error - ~p", [Error], State), shutdown(Error, State); {error, Error, ProtoState1} -> - shutdown(Error, State#client_state{proto_state = ProtoState1}); + shutdown(Error, State#state{proto_state = ProtoState1}); {stop, Reason, ProtoState1} -> - stop(Reason, State#client_state{proto_state = ProtoState1}) + stop(Reason, State#state{proto_state = ProtoState1}) end; {error, Error} -> ?LOG(error, "Framing error - ~p", [Error], State), @@ -338,34 +338,34 @@ received(Bytes, State = #client_state{parser = Parser, shutdown(parser_error, State) end. -rate_limit(_Size, State = #client_state{rate_limit = undefined}) -> +rate_limit(_Size, State = #state{rate_limit = undefined}) -> run_socket(State); -rate_limit(Size, State = #client_state{rate_limit = Rl}) -> +rate_limit(Size, State = #state{rate_limit = Rl}) -> case Rl:check(Size) of {0, Rl1} -> - run_socket(State#client_state{conn_state = running, rate_limit = Rl1}); + run_socket(State#state{conn_state = running, rate_limit = Rl1}); {Pause, Rl1} -> ?LOG(warning, "Rate limiter pause for ~p", [Pause], State), erlang:send_after(Pause, self(), activate_sock), - State#client_state{conn_state = blocked, rate_limit = Rl1} + State#state{conn_state = blocked, rate_limit = Rl1} end. -run_socket(State = #client_state{conn_state = blocked}) -> +run_socket(State = #state{conn_state = blocked}) -> State; -run_socket(State = #client_state{await_recv = true}) -> +run_socket(State = #state{await_recv = true}) -> State; -run_socket(State = #client_state{connection = Conn}) -> +run_socket(State = #state{connection = Conn}) -> Conn:async_recv(0, infinity), - State#client_state{await_recv = true}. + State#state{await_recv = true}. -with_proto(Fun, State = #client_state{proto_state = ProtoState}) -> +with_proto(Fun, State = #state{proto_state = ProtoState}) -> {ok, ProtoState1} = Fun(ProtoState), - {noreply, State#client_state{proto_state = ProtoState1}}. + {noreply, State#state{proto_state = ProtoState1}}. -emit_stats(State = #client_state{proto_state = ProtoState}) -> +emit_stats(State = #state{proto_state = ProtoState}) -> emit_stats(emqx_protocol:clientid(ProtoState), State). -emit_stats(_ClientId, State = #client_state{enable_stats = false}) -> +emit_stats(_ClientId, State = #state{enable_stats = false}) -> State; emit_stats(undefined, State) -> State; @@ -374,7 +374,7 @@ emit_stats(ClientId, State) -> emqx_stats:set_client_stats(ClientId, Stats), State. -sock_stats(#client_state{connection = Conn}) -> +sock_stats(#state{connection = Conn}) -> case Conn:getstat(?SOCK_STATS) of {ok, Ss} -> Ss; {error, _} -> [] end. reply(Reply, State) -> @@ -386,7 +386,7 @@ shutdown(Reason, State) -> stop(Reason, State) -> {stop, Reason, State}. -gc(State = #client_state{connection = Conn}) -> +gc(State = #state{connection = Conn}) -> Cb = fun() -> Conn:gc(), emit_stats(State) end, - emqx_gc:maybe_force_gc(#client_state.force_gc_count, State, Cb). + emqx_gc:maybe_force_gc(#state.force_gc_count, State, Cb). diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 54a5bbd01..d10eb6415 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 3f51dbcdb..b3b8293f6 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. diff --git a/src/emqx_pooler.erl b/src/emqx_pooler.erl index 3cae2264d..71af6b279 100644 --- a/src/emqx_pooler.erl +++ b/src/emqx_pooler.erl @@ -16,8 +16,6 @@ -module(emqx_pooler). --author("Feng Lee "). - -behaviour(gen_server). %% Start the pool supervisor diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 7e5cd2402..c754774a3 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -22,8 +22,6 @@ -include("emqx_mqtt.hrl"). --include("emqx_internal.hrl"). - -import(proplists, [get_value/2, get_value/3]). %% API @@ -64,7 +62,7 @@ %% @doc Init protocol init(Peername, SendFun, Opts) -> - Backoff = get_value(keepalive_backoff, Opts, 1.25), + Backoff = get_value(keepalive_backoff, Opts, 0.75), EnableStats = get_value(client_enable_stats, Opts, false), MaxLen = get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN), WsInitialHeaders = get_value(ws_initial_headers, Opts), @@ -569,10 +567,10 @@ sp(false) -> 0. %% The retained flag should be propagated for bridge. %%-------------------------------------------------------------------- -clean_retain(false, Msg = #mqtt_message{retain = true, headers = Headers}) -> +clean_retain(false, Msg = #message{retain = true, headers = Headers}) -> case lists:member(retained, Headers) of true -> Msg; - false -> Msg#mqtt_message{retain = false} + false -> Msg#message{retain = false} end; clean_retain(_IsBridge, Msg) -> Msg. diff --git a/src/emqx_pubsub.erl b/src/emqx_pubsub.erl index b4ac146a8..9554f30ad 100644 --- a/src/emqx_pubsub.erl +++ b/src/emqx_pubsub.erl @@ -20,8 +20,6 @@ -include("emqx.hrl"). --include("emqx_internal.hrl"). - -export([start_link/3]). %% PubSub API. @@ -46,8 +44,7 @@ -spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, Env) -> - gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, - ?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). + gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -164,7 +161,7 @@ pick(Topic) -> %%-------------------------------------------------------------------- init([Pool, Id, Env]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{pool = Pool, id = Id, env = Env}, hibernate}. handle_call({subscribe, Topic, Subscriber, Options}, _From, State) -> @@ -193,7 +190,7 @@ handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_server.erl b/src/emqx_server.erl index 47aafad49..6aa9acce8 100644 --- a/src/emqx_server.erl +++ b/src/emqx_server.erl @@ -49,8 +49,7 @@ %% @doc Start the server -spec(start_link(atom(), pos_integer(), list()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id, Env) -> - gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, - ?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). + gen_server:start_link(?MODULE, [Pool, Id, Env], [{hibernate_after, 10000}]). %%-------------------------------------------------------------------- %% PubSub API @@ -186,7 +185,7 @@ dump() -> %%-------------------------------------------------------------------- init([Pool, Id, Env]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), State = #state{pool = Pool, id = Id, env = Env, subids = #{}, submon = emqx_pmon:new()}, {ok, State, hibernate}. @@ -245,7 +244,7 @@ handle_info(Info, State) -> ?UNEXPECTED_INFO(Info, State). terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 44cd44e00..4ce6f7cfd 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -53,8 +53,6 @@ -include("emqx_mqtt.hrl"). --include("emqx_internal.hrl"). - -import(emqx_misc, [start_timer/2]). -import(proplists, [get_value/2, get_value/3]). @@ -193,16 +191,16 @@ subscribe(Session, PacketId, TopicTable) -> %%TODO: the ack function??... gen_server:cast(Session, {subscribe, From, TopicTable, AckFun}). %% @doc Publish Message --spec(publish(pid(), mqtt_message()) -> ok | {error, term()}). -publish(_Session, Msg = #mqtt_message{qos = ?QOS_0}) -> +-spec(publish(pid(), message()) -> ok | {error, term()}). +publish(_Session, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 Directly emqx_server:publish(Msg), ok; -publish(_Session, Msg = #mqtt_message{qos = ?QOS_1}) -> +publish(_Session, Msg = #message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically emqx_server:publish(Msg), ok; -publish(Session, Msg = #mqtt_message{qos = ?QOS_2}) -> +publish(Session, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 to Session gen_server:call(Session, {publish, Msg}, ?TIMEOUT). @@ -517,7 +515,7 @@ handle_cast(Msg, State) -> ?UNEXPECTED_MSG(Msg, State). %% Ignore Messages delivered by self -handle_info({dispatch, _Topic, #mqtt_message{from = {ClientId, _}}}, +handle_info({dispatch, _Topic, #message{from = {ClientId, _}}}, State = #state{client_id = ClientId, ignore_loop_deliver = true}) -> {noreply, State}; @@ -637,7 +635,7 @@ expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel([], _Now, State) -> State#state{await_rel_timer = undefined}; -expire_awaiting_rel([{PacketId, Msg = #mqtt_message{timestamp = TS}} | Msgs], +expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> case (timer:now_diff(Now, TS) div 1000) of diff --git a/src/emqx_session_sup.erl b/src/emqx_session_sup.erl index 35a22779a..6453bcba9 100644 --- a/src/emqx_session_sup.erl +++ b/src/emqx_session_sup.erl @@ -14,11 +14,8 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Session Supervisor. -module(emqx_session_sup). --author("Feng Lee "). - -behavior(supervisor). -export([start_link/0, start_session/3]). diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index d75f68984..5440bbded 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -20,8 +20,6 @@ -include("emqx.hrl"). --include("emqx_internal.hrl"). - %% Mnesia Callbacks -export([mnesia/1]). @@ -73,7 +71,7 @@ mnesia(copy) -> %% @doc Start a session manager -spec(start_link(atom(), pos_integer()) -> {ok, pid()} | ignore | {error, term()}). start_link(Pool, Id) -> - gen_server:start_link({local, ?PROC_NAME(?MODULE, Id)}, ?MODULE, [Pool, Id], []). + gen_server:start_link(?MODULE, [Pool, Id], []). %% @doc Start a session -spec(start_session(boolean(), {binary(), binary() | undefined}) -> {ok, pid(), boolean()} | {error, term()}). @@ -129,7 +127,7 @@ local_sessions() -> %%-------------------------------------------------------------------- init([Pool, Id]) -> - ?GPROC_POOL(join, Pool, Id), + gproc_pool:connect_worker(Pool, {Pool, Id}), {ok, #state{pool = Pool, id = Id, monitors = dict:new()}}. %% Persistent Session @@ -163,10 +161,12 @@ handle_call({start_session, true, {ClientId, Username}, ClientPid}, _From, State end; handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[MQTT-SM] Unexpected Request: ~p", [Req]), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[MQTT-SM] Unexpected Message: ~p", [Msg]), + {noreply, State}. handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> case dict:find(MRef, State#state.monitors) of @@ -186,10 +186,11 @@ handle_info({'DOWN', MRef, process, DownPid, _Reason}, State) -> end; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[MQTT-SM] Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, #state{pool = Pool, id = Id}) -> - ?GPROC_POOL(leave, Pool, Id). + gproc_pool:disconnect_worker(Pool, {Pool, Id}). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -202,7 +203,8 @@ code_change(_OldVsn, State, _Extra) -> create_session({CleanSess, {ClientId, Username}, ClientPid}, State) -> case create_session(CleanSess, {ClientId, Username}, ClientPid) of {ok, SessPid} -> - {reply, {ok, SessPid, false}, monitor_session(ClientId, SessPid, State)}; + {reply, {ok, SessPid, false}, + monitor_session(ClientId, SessPid, State)}; {error, Error} -> {reply, {error, Error}, State} end. diff --git a/src/emqx_sm_helper.erl b/src/emqx_sm_helper.erl index 905260c4f..9f8970c08 100644 --- a/src/emqx_sm_helper.erl +++ b/src/emqx_sm_helper.erl @@ -14,7 +14,6 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Session Helper. -module(emqx_sm_helper). -author("Feng Lee "). @@ -23,8 +22,6 @@ -include("emqx.hrl"). --include("emqx_internal.hrl"). - -include_lib("stdlib/include/ms_transform.hrl"). %% API Function Exports @@ -49,10 +46,12 @@ init([StatsFun]) -> {ok, #state{stats_fun = StatsFun, ticker = TRef}}. handle_call(Req, _From, State) -> - ?UNEXPECTED_REQ(Req, State). + lager:error("[SM-HELPER] Unexpected Call: ~p", [Req]), + {reply, ignore, State}. handle_cast(Msg, State) -> - ?UNEXPECTED_MSG(Msg, State). + lager:error("[SM-HELPER] Unexpected Cast: ~p", [Msg]), + {noreply, State}. handle_info({membership, {mnesia, down, Node}}, State) -> Fun = fun() -> @@ -71,7 +70,8 @@ handle_info(tick, State) -> {noreply, setstats(State), hibernate}; handle_info(Info, State) -> - ?UNEXPECTED_INFO(Info, State). + lager:error("[SM-HELPER] Unexpected Info: ~p", [Info]), + {noreply, State}. terminate(_Reason, _State = #state{ticker = TRef}) -> timer:cancel(TRef), diff --git a/src/emqx_sm_sup.erl b/src/emqx_sm_sup.erl index 76734ed9c..1c9098715 100644 --- a/src/emqx_sm_sup.erl +++ b/src/emqx_sm_sup.erl @@ -14,15 +14,10 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc Session Manager Supervisor. - -module(emqx_sm_sup). -behaviour(supervisor). --author("Feng Lee "). - --include("emqx.hrl"). -define(SM, emqx_sm). @@ -39,7 +34,7 @@ start_link() -> init([]) -> %% Create session tables - ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]), + _ = ets:new(mqtt_local_session, [public, ordered_set, named_table, {write_concurrency, true}]), %% Helper StatsFun = emqx_stats:statsfun('sessions/count', 'sessions/max'), diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index f99bbb725..ba2ead2d2 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,15 +14,9 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT Topic Trie: -%% [Trie](http://en.wikipedia.org/wiki/Trie) -%% @end - -module(emqx_trie). --author("Feng Lee "). - --include("emqx_trie.hrl"). +-include("emqx.hrl"). %% Mnesia Callbacks -export([mnesia/1]). diff --git a/src/emqx_ws.erl b/src/emqx_ws.erl index ff9ea3a5b..3ff2551bb 100644 --- a/src/emqx_ws.erl +++ b/src/emqx_ws.erl @@ -52,7 +52,7 @@ handle_request('GET', "/mqtt", Req) -> Parser = emqx_parser:initial_state(PacketSize), %% Upgrade WebSocket. {ReentryWs, ReplyChannel} = mochiweb_websocket:upgrade_connection(Req, fun ?MODULE:ws_loop/3), - {ok, ClientPid} = emqx_ws_client_sup:start_client(self(), Req, ReplyChannel), + {ok, ClientPid} = emqx_ws_conn_sup:start_connection(self(), Req, ReplyChannel), ReentryWs(#wsocket_state{peername = Peername, parser = Parser, max_packet_size = PacketSize, diff --git a/src/emqx_ws_client.erl b/src/emqx_ws_conn.erl similarity index 98% rename from src/emqx_ws_client.erl rename to src/emqx_ws_conn.erl index b5f8d7b51..61e717a5c 100644 --- a/src/emqx_ws_client.erl +++ b/src/emqx_ws_conn.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,20 +14,14 @@ %% limitations under the License. %%-------------------------------------------------------------------- -%% @doc MQTT WebSocket Connection. - --module(emqx_ws_client). +-module(emqx_ws_conn). -behaviour(gen_server). --author("Feng Lee "). - -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --include("emqx_internal.hrl"). - -import(proplists, [get_value/2, get_value/3]). %% API Exports diff --git a/src/emqx_ws_client_sup.erl b/src/emqx_ws_conn_sup.erl similarity index 71% rename from src/emqx_ws_client_sup.erl rename to src/emqx_ws_conn_sup.erl index d7806f3a6..f1a26a91b 100644 --- a/src/emqx_ws_client_sup.erl +++ b/src/emqx_ws_conn_sup.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -14,24 +14,21 @@ %% limitations under the License. %%-------------------------------------------------------------------- --module(emqx_ws_client_sup). - --author("Feng Lee "). +-module(emqx_ws_conn_sup). -behavior(supervisor). --export([start_link/0, start_client/3]). +-export([start_link/0, start_connection/3]). -export([init/1]). -%% @doc Start websocket client supervisor -spec(start_link() -> {ok, pid()}). start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). -%% @doc Start a WebSocket Connection. --spec(start_client(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). -start_client(WsPid, Req, ReplyChannel) -> +%% @doc Start a MQTT/WebSocket Connection. +-spec(start_connection(pid(), mochiweb_request:request(), fun()) -> {ok, pid()}). +start_connection(WsPid, Req, ReplyChannel) -> supervisor:start_child(?MODULE, [WsPid, Req, ReplyChannel]). %%-------------------------------------------------------------------- @@ -39,8 +36,9 @@ start_client(WsPid, Req, ReplyChannel) -> %%-------------------------------------------------------------------- init([]) -> + %%TODO: Cannot upgrade the environments, Use zone? Env = lists:append(emqx:env(client, []), emqx:env(protocol, [])), {ok, {{simple_one_for_one, 0, 1}, - [{ws_client, {emqx_ws_client, start_link, [Env]}, - temporary, 5000, worker, [emqx_ws_client]}]}}. + [{ws_conn, {emqx_ws_conn, start_link, [Env]}, + temporary, 5000, worker, [emqx_ws_conn]}]}}. diff --git a/test/emqx_trie_SUITE.erl b/test/emqx_trie_SUITE.erl index 69f251372..e0b0b569d 100644 --- a/test/emqx_trie_SUITE.erl +++ b/test/emqx_trie_SUITE.erl @@ -1,5 +1,5 @@ %%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) +%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -16,11 +16,9 @@ -module(emqx_trie_SUITE). --author("Feng Lee "). - -compile(export_all). --include("emqx_trie.hrl"). +-include("emqx.hrl"). -define(TRIE, emqx_trie).