diff --git a/TODO b/TODO index 055d2b8d4..87e6dea16 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,7 @@ 1. Update the README.md 2. Update the Documentation -3. Shared subscription strategy and dispatch strategy +3. Shared subscription and dispatch strategy +4. Remove lager syslog: + dep_lager_syslog = git https://github.com/basho/lager_syslog diff --git a/etc/emqx.conf b/etc/emqx.conf index 5aa4bf3ea..7101c3a7b 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -702,25 +702,25 @@ listener.tcp.external.acceptors = 16 ## Value: Number listener.tcp.external.max_clients = 102400 -## TODO: Zone of the external MQTT/TCP listener belonged to. +## Zone of the external MQTT/TCP listener belonged to. ## ## Value: String -## listener.tcp.external.zone = external +listener.tcp.external.zone = devicebound ## Mountpoint of the MQTT/TCP Listener. All the topics of this ## listener will be prefixed with the mount point if this option ## is enabled. -## Notice that EMQ X supports wildcard mount:%c clientid, %u username +## Notice that supports wildcard mount:%c clientid, %u username ## ## Value: String -## listener.tcp.external.mountpoint = external/ +listener.tcp.external.mountpoint = devicebound/ ## Rate limit for the external MQTT/TCP connections. ## Format is 'burst,rate'. ## ## Value: burst,rate ## Unit: KB/sec -## listener.tcp.external.rate_limit = 100,10 +listener.tcp.external.rate_limit = 100,10 ## The access control rules for the MQTT/TCP listener. ## diff --git a/priv/emqx.schema b/priv/emqx.schema index b4187397a..fe43f8f53 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -1409,21 +1409,18 @@ end}. MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, - ConnOpts = fun(Prefix) -> - Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, - {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, - {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, - {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, - {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, - {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, - {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, - {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)}]) - end, - LisOpts = fun(Prefix) -> Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, - {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) + {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)}, + {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))}, + {rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)}, + {proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)}, + {proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)}, + {mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))}, + {peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)}, + {proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)}, + {proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)]) end, TcpOpts = fun(Prefix) -> Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, @@ -1460,11 +1457,9 @@ end}. TcpListeners = fun(Type, Name) -> Prefix = string:join(["listener", Type, Name], "."), case cuttlefish:conf_get(Prefix, Conf, undefined) of - undefined -> - []; - ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}] + undefined -> []; + ListenOn -> + [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1474,9 +1469,8 @@ end}. undefined -> []; ListenOn -> - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}, - {sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}] + [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}, + {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}] end end, @@ -1486,12 +1480,8 @@ end}. undefined -> []; ListenOn -> - SslOpts1 = case SslOpts(Prefix) of - [] -> []; - SslOpts0 -> [{sslopts, SslOpts0}] - end, - [{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, - {sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}] + SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end, + [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}] end end, diff --git a/src/emqx.app.src b/src/emqx.app.src index f3ccb8fa3..36807e47b 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -3,7 +3,7 @@ {vsn,"3.0"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]}, + {applications,[kernel, stdlib,jsx,gproc,gen_rpc,lager,ekka,esockd,mochiweb]}, {env,[]}, {mod,{emqx_app,[]}}, {maintainers,["Feng Lee "]}, diff --git a/src/emqx_alarm_mgr.erl b/src/emqx_alarm_mgr.erl index f6901c325..e041341e2 100644 --- a/src/emqx_alarm_mgr.erl +++ b/src/emqx_alarm_mgr.erl @@ -131,7 +131,7 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title, {ts, emqx_time:now_secs(Ts)}]). alarm_msg(Type, AlarmId, Json) -> - emqx_message:make(?ALARM_MGR, #{sys => true, qos => 0}, topic(Type, AlarmId), Json). + emqx_message:make(?ALARM_MGR, #{sys => true}, topic(Type, AlarmId), Json). topic(alert, AlarmId) -> emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index 544646701..cd71b1af4 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -17,24 +17,17 @@ -behaviour(gen_server). -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). - -include("emqx_misc.hrl"). -import(proplists, [get_value/2, get_value/3]). -%% API Function Exports -export([start_link/3]). - %% Management and Monitor API -export([info/1, stats/1, kick/1, clean_acl_cache/2]). - -export([set_rate_limit/2, get_rate_limit/1]). - %% SUB/UNSUB Asynchronously. Called by plugins. -export([subscribe/2, unsubscribe/2]). - %% Get the session proc? -export([session/1]). @@ -48,15 +41,12 @@ keepalive, enable_stats, idle_timeout, force_gc_count}). -define(INFO_KEYS, [peername, conn_state, await_recv]). - -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). - -define(LOG(Level, Format, Args, State), - emqx_logger:Level("Client(~s): " ++ Format, - [esockd_net:format(State#state.peername) | Args])). + emqx_logger:Level("Conn(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). -start_link(Transport, Sock, Env) -> - {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Env]])}. +start_link(Transport, Sock, Options) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Options]])}. info(CPid) -> gen_server:call(CPid, info). @@ -85,26 +75,27 @@ session(CPid) -> clean_acl_cache(CPid, Topic) -> gen_server:call(CPid, {clean_acl_cache, Topic}). -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ %% gen_server callbacks -%%-------------------------------------------------------------------- +%%------------------------------------------------------------------------------ -init([Transport, Sock, Env]) -> +init([Transport, Sock, Options]) -> case Transport:wait(Sock) of {ok, NewSock} -> {ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]), - do_init(Transport, Sock, Peername, Env); + do_init(Transport, Sock, Peername, Options); {error, Reason} -> {stop, Reason} end. -do_init(Transport, Sock, Peername, Env) -> - RateLimit = get_value(rate_limit, Env), - PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), +do_init(Transport, Sock, Peername, Options) -> + io:format("Options: ~p~n", [Options]), + RateLimit = get_value(rate_limit, Options), + PacketSize = get_value(max_packet_size, Options, ?MAX_PACKET_SIZE), SendFun = send_fun(Transport, Sock, Peername), - ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Env), - EnableStats = get_value(client_enable_stats, Env, false), - IdleTimout = get_value(client_idle_timeout, Env, 30000), + ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Options), + EnableStats = get_value(client_enable_stats, Options, false), + IdleTimout = get_value(client_idle_timeout, Options, 30000), ForceGcCount = emqx_gc:conn_max_gc_count(), State = run_socket(#state{transport = Transport, socket = Sock, @@ -136,8 +127,7 @@ send_fun(Transport, Sock, Peername) -> init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) -> Version = emqx_protocol:get(proto_ver, ProtoState), - State#state{parse_state = emqx_frame:initial_state( - #{max_packet_size => Size, version => Version})}. + State#state{parse_state = emqx_frame:initial_state(#{max_packet_size => Size, version => Version})}. handle_call(info, From, State = #state{proto_state = ProtoState}) -> ProtoInfo = emqx_protocol:info(ProtoState), @@ -194,10 +184,6 @@ handle_info({suback, PacketId, GrantedQos}, State) -> emqx_protocol:send(Packet, ProtoState) end, State); -%% Fastlane -handle_info({dispatch, _Topic, Msg}, State) -> - handle_info({deliver, emqx_message:set_flag(qos, ?QOS_0, Msg)}, State); - handle_info({deliver, Message}, State) -> with_proto( fun(ProtoState) -> diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index 02de39123..c9697f0ca 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -42,10 +42,14 @@ start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss -> start_http_listener('mqtt:wss', ListenOn, Options). start_mqtt_listener(Name, ListenOn, Options) -> - {ok, _} = esockd:open(Name, ListenOn, merge_sockopts(Options), {emqx_connection, start_link, []}). + SockOpts = esockd:parse_opt(Options), + MFA = {emqx_connection, start_link, [Options -- SockOpts]}, + {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA). start_http_listener(Name, ListenOn, Options) -> - {ok, _} = mochiweb:start_http(Name, ListenOn, Options, {emqx_ws, handle_request, []}). + SockOpts = esockd:parse_opt(Options), + MFA = {emqx_ws, handle_request, [Options -- SockOpts]}, + {ok, _} = mochiweb:start_http(Name, ListenOn, SockOpts, MFA). %% @doc Restart all listeners -spec(restart_all() -> ok). @@ -56,7 +60,7 @@ restart_all() -> restart_listener({tcp, ListenOn, _Opts}) -> esockd:reopen('mqtt:tcp', ListenOn); restart_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> - esockd:reopen('mqtt:tls', ListenOn); + esockd:reopen('mqtt:ssl', ListenOn); restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:restart_http('mqtt:ws', ListenOn); restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> @@ -73,7 +77,7 @@ stop_all() -> stop_listener({tcp, ListenOn, _Opts}) -> esockd:close('mqtt:tcp', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> - esockd:close('mqtt:tls', ListenOn); + esockd:close('mqtt:ssl', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> mochiweb:stop_http('mqtt:ws', ListenOn); stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> @@ -81,7 +85,7 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> stop_listener({Proto, ListenOn, _Opts}) -> esockd:close(Proto, ListenOn). -merge_sockopts(Options) -> +merge_default(Options) -> case lists:keytake(tcp_options, 1, Options) of {value, {tcp_options, TcpOpts}, Options1} -> [{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1]; @@ -89,11 +93,3 @@ merge_sockopts(Options) -> [{tcp_options, ?MQTT_SOCKOPTS} | Options] end. -%% all() -> -%% [Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)]. -%%is_mqtt('mqtt:tcp') -> true; -%%is_mqtt('mqtt:tls') -> true; -%%is_mqtt('mqtt:ws') -> true; -%%is_mqtt('mqtt:wss') -> true; -%%is_mqtt(_Proto) -> false. - diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 3a96f75a6..77dbfbf82 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -37,6 +37,7 @@ new(From, Flags, Topic, Payload) when is_atom(From); is_record(From, client) -> -spec(new(atom() | client(), message_flags(), message_headers(), topic(), payload()) -> message()). new(From, Flags, Headers, Topic, Payload) when is_atom(From); is_record(From, client) -> #message{id = msgid(), + qos = ?QOS0, from = From, sender = self(), flags = Flags, diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index e6bd16540..db2f30cb6 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -21,7 +21,7 @@ %% This module implements a simple in-memory queue for MQTT persistent session. %% %% If the broker restarted or crashed, all the messages queued will be gone. -%% +%% %% Concept of Message Queue and Inflight Window: %% %% |<----------------- Max Len ----------------->| @@ -154,7 +154,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped %% @doc Enqueue a message. -spec(in(message(), mqueue()) -> mqueue()). -in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> +in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> MQ; in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index c932089e7..b2e9965e8 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -1,44 +1,36 @@ -%%%=================================================================== -%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%%=================================================================== +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. -module(emqx_protocol). -include("emqx.hrl"). - -include("emqx_mqtt.hrl"). - -include("emqx_misc.hrl"). -import(proplists, [get_value/2, get_value/3]). %% API -export([init/3, init/5, get/2, info/1, stats/1, clientid/1, client/1, session/1]). - -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). - -export([received/2, send/2]). - -export([process/2]). -ifdef(TEST). -compile(export_all). -endif. --record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, - send_pkt = 0, send_msg = 0}). +-record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}). %% Protocol State %% ws_initial_headers: Headers from first HTTP request for WebSocket Client. @@ -76,23 +68,22 @@ init(Peername, SendFun, Opts) -> keepalive_backoff = Backoff, stats_data = #proto_stats{enable_stats = EnableStats}}. -init(_Transport, _Sock, Peername, SendFun, Opts) -> - init(Peername, SendFun, Opts). - %%enrich_opt(Conn:opts(), Conn, ). +init(_Transport, _Sock, Peername, SendFun, Options) -> + enrich_opt(Options, init(Peername, SendFun, Options)). -enrich_opt([], _Conn, State) -> +enrich_opt([], State) -> State; -enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) -> - enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint}); -enrich_opt([{peer_cert_as_username, N} | ConnOpts], Conn, State) -> - enrich_opt(ConnOpts, Conn, State#proto_state{peercert_username = peercert_username(N, Conn)}); -enrich_opt([_ | ConnOpts], Conn, State) -> - enrich_opt(ConnOpts, Conn, State). +enrich_opt([{mountpoint, MountPoint} | ConnOpts], State) -> + enrich_opt(ConnOpts, State#proto_state{mountpoint = MountPoint}); +%%enrich_opt([{peer_cert_as_username, N} | ConnOpts], State) -> +%% enrich_opt(ConnOpts, State#proto_state{peercert_username = peercert_username(N, Conn)}); +enrich_opt([_ | ConnOpts], State) -> + enrich_opt(ConnOpts, State). -peercert_username(cn, Conn) -> - Conn:peer_cert_common_name(); -peercert_username(dn, Conn) -> - Conn:peer_cert_subject(). +%%peercert_username(cn, Conn) -> +%% Conn:peer_cert_common_name(); +%%peercert_username(dn, Conn) -> +%% Conn:peer_cert_subject(). repl_username_with_peercert(State = #proto_state{peercert_username = undefined}) -> State; @@ -122,17 +113,14 @@ client(#proto_state{client_id = ClientId, proto_ver = ProtoVer, keepalive = Keepalive, will_msg = WillMsg, - ws_initial_headers = WsInitialHeaders, - mountpoint = MountPoint, - connected_at = Time}) -> + ws_initial_headers = _WsInitialHeaders, + mountpoint = _MountPoint, + connected_at = _Time}) -> WillTopic = if WillMsg =:= undefined -> undefined; true -> WillMsg#message.topic end, - #client{id = ClientId, - pid = ClientPid, - username = Username, - peername = Peername}. + #client{id = ClientId, pid = ClientPid, username = Username, peername = Peername}. session(#proto_state{session = Session}) -> Session. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 83d53abc6..607a3644f 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -1,48 +1,16 @@ -%%%=================================================================== -%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%%=================================================================== - --module(emqx_session). - --behaviour(gen_server). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include("emqx_misc.hrl"). - --import(emqx_misc, [start_timer/2]). - --import(proplists, [get_value/2, get_value/3]). - -%% Session API --export([start_link/1, resume/2, discard/2]). - -%% Management and Monitor API --export([state/1, info/1, stats/1]). - -%% PubSub API --export([subscribe/2, subscribe/3, publish/2, puback/2, pubrec/2, - pubrel/2, pubcomp/2, unsubscribe/2]). - -%% gen_server Function Exports --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --define(MQueue, emqx_mqueue). +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. %% A stateful interaction between a Client and a Server. Some Sessions %% last only as long as the Network Connection, others can span multiple @@ -66,6 +34,32 @@ %% %% If the session is currently disconnected, the time at which the Session state %% will be deleted. +-module(emqx_session). + +-behaviour(gen_server). + +-include("emqx.hrl"). +-include("emqx_mqtt.hrl"). +-include("emqx_misc.hrl"). + +-import(emqx_misc, [start_timer/2]). +-import(proplists, [get_value/2, get_value/3]). + +%% Session API +-export([start_link/1, resume/2, discard/2]). +%% Management and Monitor API +-export([state/1, info/1, stats/1]). +%% PubSub API +-export([subscribe/2, subscribe/3]). +-export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2]). +-export([unsubscribe/2]). + +%% gen_server Function Exports +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(MQueue, emqx_mqueue). + -record(state, { %% Clean Start Flag clean_start = false :: boolean(), @@ -145,9 +139,7 @@ }). -define(TIMEOUT, 60000). - -define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]). - -define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, @@ -169,71 +161,71 @@ start_link(Attrs) -> %% @doc Subscribe topics -spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). -subscribe(SessionPid, TopicTable) -> %%TODO: the ack function??... - gen_server:cast(SessionPid, {subscribe, self(), TopicTable, fun(_) -> ok end}). +subscribe(SPid, TopicTable) -> %%TODO: the ack function??... + gen_server:cast(SPid, {subscribe, self(), TopicTable, fun(_) -> ok end}). -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok). -subscribe(SessionPid, PacketId, TopicTable) -> %%TODO: the ack function??... +subscribe(SPid, PacketId, TopicTable) -> %%TODO: the ack function??... From = self(), AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, - gen_server:cast(SessionPid, {subscribe, From, TopicTable, AckFun}). + gen_server:cast(SPid, {subscribe, From, TopicTable, AckFun}). %% @doc Publish Message --spec(publish(pid(), message()) -> ok | {error, term()}). -publish(_SessionPid, Msg = #message{qos = ?QOS_0}) -> +-spec(publish(pid(), message()) -> {ok, delivery()} | {error, term()}). +publish(_SPid, Msg = #message{qos = ?QOS_0}) -> %% Publish QoS0 Directly - emqx_broker:publish(Msg), ok; + emqx_broker:publish(Msg); -publish(_SessionPid, Msg = #message{qos = ?QOS_1}) -> +publish(_SPid, Msg = #message{qos = ?QOS_1}) -> %% Publish QoS1 message directly for client will PubAck automatically - emqx_broker:publish(Msg), ok; + emqx_broker:publish(Msg); -publish(SessionPid, Msg = #message{qos = ?QOS_2}) -> +publish(SPid, Msg = #message{qos = ?QOS_2}) -> %% Publish QoS2 to Session - gen_server:call(SessionPid, {publish, Msg}, ?TIMEOUT). + gen_server:call(SPid, {publish, Msg}, infinity). %% @doc PubAck Message -spec(puback(pid(), mqtt_packet_id()) -> ok). -puback(SessionPid, PacketId) -> - gen_server:cast(SessionPid, {puback, PacketId}). +puback(SPid, PacketId) -> + gen_server:cast(SPid, {puback, PacketId}). -spec(pubrec(pid(), mqtt_packet_id()) -> ok). -pubrec(SessionPid, PacketId) -> - gen_server:cast(SessionPid, {pubrec, PacketId}). +pubrec(SPid, PacketId) -> + gen_server:cast(SPid, {pubrec, PacketId}). -spec(pubrel(pid(), mqtt_packet_id()) -> ok). -pubrel(SessionPid, PacketId) -> - gen_server:cast(SessionPid, {pubrel, PacketId}). +pubrel(SPid, PacketId) -> + gen_server:cast(SPid, {pubrel, PacketId}). -spec(pubcomp(pid(), mqtt_packet_id()) -> ok). -pubcomp(SessionPid, PacketId) -> - gen_server:cast(SessionPid, {pubcomp, PacketId}). +pubcomp(SPid, PacketId) -> + gen_server:cast(SPid, {pubcomp, PacketId}). %% @doc Unsubscribe the topics -spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok). -unsubscribe(SessionPid, TopicTable) -> - gen_server:cast(SessionPid, {unsubscribe, self(), TopicTable}). +unsubscribe(SPid, TopicTable) -> + gen_server:cast(SPid, {unsubscribe, self(), TopicTable}). %% @doc Resume the session -spec(resume(pid(), pid()) -> ok). -resume(SessionPid, ClientPid) -> - gen_server:cast(SessionPid, {resume, ClientPid}). +resume(SPid, ClientPid) -> + gen_server:cast(SPid, {resume, ClientPid}). %% @doc Get session state -state(SessionPid) when is_pid(SessionPid) -> - gen_server:call(SessionPid, state). +state(SPid) when is_pid(SPid) -> + gen_server:call(SPid, state). %% @doc Get session info -spec(info(pid() | #state{}) -> list(tuple())). -info(SessionPid) when is_pid(SessionPid) -> - gen_server:call(SessionPid, info); +info(SPid) when is_pid(SPid) -> + gen_server:call(SPid, info); info(State) when is_record(State, state) -> ?record_to_proplist(state, State, ?INFO_KEYS). -spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -stats(SessionPid) when is_pid(SessionPid) -> - gen_server:call(SessionPid, stats); +stats(SPid) when is_pid(SPid) -> + gen_server:call(SPid, stats); stats(#state{max_subscriptions = MaxSubscriptions, subscriptions = Subscriptions, @@ -257,8 +249,8 @@ stats(#state{max_subscriptions = MaxSubscriptions, %% @doc Discard the session -spec(discard(pid(), client_id()) -> ok). -discard(SessionPid, ClientId) -> - gen_server:call(SessionPid, {discard, ClientId}). +discard(SPid, ClientId) -> + gen_server:call(SPid, {discard, ClientId}). %%-------------------------------------------------------------------- %% gen_server Callbacks @@ -342,41 +334,34 @@ handle_call(state, _From, State) -> reply(?record_to_proplist(state, State, ?STATE_KEYS), State); handle_call(Req, _From, State) -> - emqx_logger:error("[Session] Unexpected request: ~p", [Req]), - {reply, ignore, State}. + emqx_logger:error("[Session] unexpected call: ~p", [Req]), + {reply, ignored, State}. handle_cast({subscribe, From, TopicTable, AckFun}, - State = #state{client_id = ClientId, - username = Username, - subscriptions = Subscriptions}) -> + State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) -> ?LOG(info, "Subscribe ~p", [TopicTable], State), {GrantedQos, Subscriptions1} = lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> - io:format("SubOpts: ~p~n", [Opts]), - Fastlane = lists:member(fastlane, Opts), - NewQos = if Fastlane == true -> ?QOS_0; true -> get_value(qos, Opts) end, + NewQos = get_value(qos, Opts), SubMap1 = case maps:find(Topic, SubMap) of {ok, NewQos} -> ?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State), SubMap; {ok, OldQos} -> - emqx_broker:setopts(Topic, ClientId, [{qos, NewQos}]), + %% TODO:.... + emqx_broker:set_subopts(Topic, ClientId, [{qos, NewQos}]), emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), - ?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", - [Topic, OldQos, NewQos], State), + ?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], State), maps:put(Topic, NewQos, SubMap); error -> - case Fastlane of - true -> emqx:subscribe(Topic, From, Opts); - false -> emqx:subscribe(Topic, ClientId, Opts) - end, + %% TODO:.... + emqx:subscribe(Topic, ClientId, Opts), emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), maps:put(Topic, NewQos, SubMap) end, {[NewQos|QosAcc], SubMap1} end, {[], Subscriptions}, TopicTable), - io:format("GrantedQos: ~p~n", [GrantedQos]), AckFun(lists:reverse(GrantedQos)), {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; @@ -501,7 +486,7 @@ handle_cast({resume, ClientPid}, {noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; handle_cast(Msg, State) -> - emqx_logger:error("[Session] Unexpected msg: ~p", [Msg]), + emqx_logger:error("[Session] unexpected cast: ~p", [Msg]), {noreply, State}. %% Ignore Messages delivered by self @@ -546,16 +531,15 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) -> handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> - ?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", + ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", [ClientPid, Pid, Reason], State), {noreply, State, hibernate}; handle_info(Info, State) -> - emqx_logger:error("[Session] Unexpected info: ~p", [Info]), + emqx_logger:error("[Session] unexpected info: ~p", [Info]), {noreply, State}. terminate(Reason, #state{client_id = ClientId, username = Username}) -> - emqx_hooks:run('session.terminated', [ClientId, Username, Reason]), emqx_sm:unregister_session(ClientId). diff --git a/src/emqx_sup.erl b/src/emqx_sup.erl index 61d48042e..3df468f1e 100644 --- a/src/emqx_sup.erl +++ b/src/emqx_sup.erl @@ -1,18 +1,16 @@ -%%%=================================================================== -%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. -%%% -%%% Licensed under the Apache License, Version 2.0 (the "License"); -%%% you may not use this file except in compliance with the License. -%%% You may obtain a copy of the License at -%%% -%%% http://www.apache.org/licenses/LICENSE-2.0 -%%% -%%% Unless required by applicable law or agreed to in writing, software -%%% distributed under the License is distributed on an "AS IS" BASIS, -%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%%% See the License for the specific language governing permissions and -%%% limitations under the License. -%%%=================================================================== +%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. -module(emqx_sup). @@ -74,7 +72,7 @@ init([]) -> %% Connection Manager CMSup = supervisor_spec(emqx_cm_sup), %% WebSocket Connection Sup - WSConnSup = supervisor_spec(emqx_ws_connection_sup), + %% WSConnSup = supervisor_spec(emqx_ws_connection_sup), %% Sys Sup SysSup = supervisor_spec(emqx_sys_sup), {ok, {{one_for_all, 0, 1}, @@ -86,7 +84,7 @@ init([]) -> SMSup, SessionSup, CMSup, - WSConnSup, + %%WSConnSup, SysSup]}}. %%-------------------------------------------------------------------- diff --git a/src/emqx_sys.erl b/src/emqx_sys.erl index 8644f8801..667ef0f1a 100644 --- a/src/emqx_sys.erl +++ b/src/emqx_sys.erl @@ -171,6 +171,6 @@ publish(metrics, Metrics) -> safe_publish(Topic, Payload) -> safe_publish(Topic, #{}, Payload). safe_publish(Topic, Flags, Payload) -> - Flags1 = maps:merge(#{sys => true, qos => 0}, Flags), + Flags1 = maps:merge(#{sys => true}, Flags), emqx_broker:safe_publish(emqx_message:new(?SYS, Flags1, Topic, iolist_to_binary(Payload))). diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index af6d39138..435dfaaee 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -158,5 +158,5 @@ safe_publish(Event, WarnMsg) -> emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))). sysmon_msg(Topic, Payload) -> - emqx_message:new(?SYSMON, #{sys => true, qos => 0}, Topic, Payload). + emqx_message:new(?SYSMON, #{sys => true}, Topic, Payload).