Add 'qos' field to message record

This commit is contained in:
Feng Lee 2018-07-18 23:41:00 +08:00
parent 1735786ffb
commit 7ee54aac28
14 changed files with 183 additions and 238 deletions

4
TODO
View File

@ -1,5 +1,7 @@
1. Update the README.md 1. Update the README.md
2. Update the Documentation 2. Update the Documentation
3. Shared subscription strategy and dispatch strategy 3. Shared subscription and dispatch strategy
4. Remove lager syslog:
dep_lager_syslog = git https://github.com/basho/lager_syslog

View File

@ -702,25 +702,25 @@ listener.tcp.external.acceptors = 16
## Value: Number ## Value: Number
listener.tcp.external.max_clients = 102400 listener.tcp.external.max_clients = 102400
## TODO: Zone of the external MQTT/TCP listener belonged to. ## Zone of the external MQTT/TCP listener belonged to.
## ##
## Value: String ## Value: String
## listener.tcp.external.zone = external listener.tcp.external.zone = devicebound
## Mountpoint of the MQTT/TCP Listener. All the topics of this ## Mountpoint of the MQTT/TCP Listener. All the topics of this
## listener will be prefixed with the mount point if this option ## listener will be prefixed with the mount point if this option
## is enabled. ## is enabled.
## Notice that EMQ X supports wildcard mount:%c clientid, %u username ## Notice that supports wildcard mount:%c clientid, %u username
## ##
## Value: String ## Value: String
## listener.tcp.external.mountpoint = external/ listener.tcp.external.mountpoint = devicebound/
## Rate limit for the external MQTT/TCP connections. ## Rate limit for the external MQTT/TCP connections.
## Format is 'burst,rate'. ## Format is 'burst,rate'.
## ##
## Value: burst,rate ## Value: burst,rate
## Unit: KB/sec ## Unit: KB/sec
## listener.tcp.external.rate_limit = 100,10 listener.tcp.external.rate_limit = 100,10
## The access control rules for the MQTT/TCP listener. ## The access control rules for the MQTT/TCP listener.
## ##

View File

@ -1409,21 +1409,18 @@ end}.
MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end, MountPoint = fun(undefined) -> undefined; (S) -> list_to_binary(S) end,
ConnOpts = fun(Prefix) ->
Filter([{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
{rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)}])
end,
LisOpts = fun(Prefix) -> LisOpts = fun(Prefix) ->
Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)}, Filter([{acceptors, cuttlefish:conf_get(Prefix ++ ".acceptors", Conf)},
{max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)}, {max_clients, cuttlefish:conf_get(Prefix ++ ".max_clients", Conf)},
{tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)} | AccOpts(Prefix)]) {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},
{zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
{rate_limit, cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined)},
{proxy_protocol, cuttlefish:conf_get(Prefix ++ ".proxy_protocol", Conf, undefined)},
{proxy_protocol_timeout, cuttlefish:conf_get(Prefix ++ ".proxy_protocol_timeout", Conf, undefined)},
{mountpoint, MountPoint(cuttlefish:conf_get(Prefix ++ ".mountpoint", Conf, undefined))},
{peer_cert_as_username, cuttlefish:conf_get(Prefix ++ ".peer_cert_as_username", Conf, undefined)},
{proxy_port_header, cuttlefish:conf_get(Prefix ++ ".proxy_port_header", Conf, undefined)},
{proxy_address_header, cuttlefish:conf_get(Prefix ++ ".proxy_address_header", Conf, undefined)} | AccOpts(Prefix)])
end, end,
TcpOpts = fun(Prefix) -> TcpOpts = fun(Prefix) ->
Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)}, Filter([{backlog, cuttlefish:conf_get(Prefix ++ ".backlog", Conf, undefined)},
@ -1460,11 +1457,9 @@ end}.
TcpListeners = fun(Type, Name) -> TcpListeners = fun(Type, Name) ->
Prefix = string:join(["listener", Type, Name], "."), Prefix = string:join(["listener", Type, Name], "."),
case cuttlefish:conf_get(Prefix, Conf, undefined) of case cuttlefish:conf_get(Prefix, Conf, undefined) of
undefined -> undefined -> [];
[]; ListenOn ->
ListenOn -> [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
{sockopts, TcpOpts(Prefix)} | LisOpts(Prefix)]}]
end end
end, end,
@ -1474,9 +1469,8 @@ end}.
undefined -> undefined ->
[]; [];
ListenOn -> ListenOn ->
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)}, [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)},
{sockopts, TcpOpts(Prefix)}, {ssl_options, SslOpts(Prefix)} | LisOpts(Prefix)]}]
{sslopts, SslOpts(Prefix)} | LisOpts(Prefix)]}]
end end
end, end,
@ -1486,12 +1480,8 @@ end}.
undefined -> undefined ->
[]; [];
ListenOn -> ListenOn ->
SslOpts1 = case SslOpts(Prefix) of SslOpts1 = case SslOpts(Prefix) of [] -> []; SslOpts0 -> [{ssl_options, SslOpts0}] end,
[] -> []; [{Atom(Type), ListenOn, [{tcp_options, TcpOpts(Prefix)}|LisOpts(Prefix)] ++ SslOpts1}]
SslOpts0 -> [{sslopts, SslOpts0}]
end,
[{Atom(Type), ListenOn, [{connopts, ConnOpts(Prefix)},
{sockopts, TcpOpts(Prefix)}| LisOpts(Prefix)] ++ SslOpts1}]
end end
end, end,

View File

@ -3,7 +3,7 @@
{vsn,"3.0"}, {vsn,"3.0"},
{modules,[]}, {modules,[]},
{registered,[emqx_sup]}, {registered,[emqx_sup]},
{applications,[kernel,stdlib,gproc,lager,esockd,mochiweb,lager_syslog,pbkdf2,bcrypt,clique,jsx]}, {applications,[kernel, stdlib,jsx,gproc,gen_rpc,lager,ekka,esockd,mochiweb]},
{env,[]}, {env,[]},
{mod,{emqx_app,[]}}, {mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]}, {maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -131,7 +131,7 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title,
{ts, emqx_time:now_secs(Ts)}]). {ts, emqx_time:now_secs(Ts)}]).
alarm_msg(Type, AlarmId, Json) -> alarm_msg(Type, AlarmId, Json) ->
emqx_message:make(?ALARM_MGR, #{sys => true, qos => 0}, topic(Type, AlarmId), Json). emqx_message:make(?ALARM_MGR, #{sys => true}, topic(Type, AlarmId), Json).
topic(alert, AlarmId) -> topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);

View File

@ -17,24 +17,17 @@
-behaviour(gen_server). -behaviour(gen_server).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("emqx_misc.hrl"). -include("emqx_misc.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% API Function Exports
-export([start_link/3]). -export([start_link/3]).
%% Management and Monitor API %% Management and Monitor API
-export([info/1, stats/1, kick/1, clean_acl_cache/2]). -export([info/1, stats/1, kick/1, clean_acl_cache/2]).
-export([set_rate_limit/2, get_rate_limit/1]). -export([set_rate_limit/2, get_rate_limit/1]).
%% SUB/UNSUB Asynchronously. Called by plugins. %% SUB/UNSUB Asynchronously. Called by plugins.
-export([subscribe/2, unsubscribe/2]). -export([subscribe/2, unsubscribe/2]).
%% Get the session proc? %% Get the session proc?
-export([session/1]). -export([session/1]).
@ -48,15 +41,12 @@
keepalive, enable_stats, idle_timeout, force_gc_count}). keepalive, enable_stats, idle_timeout, force_gc_count}).
-define(INFO_KEYS, [peername, conn_state, await_recv]). -define(INFO_KEYS, [peername, conn_state, await_recv]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
emqx_logger:Level("Client(~s): " ++ Format, emqx_logger:Level("Conn(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
[esockd_net:format(State#state.peername) | Args])).
start_link(Transport, Sock, Env) -> start_link(Transport, Sock, Options) ->
{ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Env]])}. {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock, Options]])}.
info(CPid) -> info(CPid) ->
gen_server:call(CPid, info). gen_server:call(CPid, info).
@ -85,26 +75,27 @@ session(CPid) ->
clean_acl_cache(CPid, Topic) -> clean_acl_cache(CPid, Topic) ->
gen_server:call(CPid, {clean_acl_cache, Topic}). gen_server:call(CPid, {clean_acl_cache, Topic}).
%%-------------------------------------------------------------------- %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
%%-------------------------------------------------------------------- %%------------------------------------------------------------------------------
init([Transport, Sock, Env]) -> init([Transport, Sock, Options]) ->
case Transport:wait(Sock) of case Transport:wait(Sock) of
{ok, NewSock} -> {ok, NewSock} ->
{ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]), {ok, Peername} = Transport:ensure_ok_or_exit(peername, [NewSock]),
do_init(Transport, Sock, Peername, Env); do_init(Transport, Sock, Peername, Options);
{error, Reason} -> {error, Reason} ->
{stop, Reason} {stop, Reason}
end. end.
do_init(Transport, Sock, Peername, Env) -> do_init(Transport, Sock, Peername, Options) ->
RateLimit = get_value(rate_limit, Env), io:format("Options: ~p~n", [Options]),
PacketSize = get_value(max_packet_size, Env, ?MAX_PACKET_SIZE), RateLimit = get_value(rate_limit, Options),
PacketSize = get_value(max_packet_size, Options, ?MAX_PACKET_SIZE),
SendFun = send_fun(Transport, Sock, Peername), SendFun = send_fun(Transport, Sock, Peername),
ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Env), ProtoState = emqx_protocol:init(Transport, Sock, Peername, SendFun, Options),
EnableStats = get_value(client_enable_stats, Env, false), EnableStats = get_value(client_enable_stats, Options, false),
IdleTimout = get_value(client_idle_timeout, Env, 30000), IdleTimout = get_value(client_idle_timeout, Options, 30000),
ForceGcCount = emqx_gc:conn_max_gc_count(), ForceGcCount = emqx_gc:conn_max_gc_count(),
State = run_socket(#state{transport = Transport, State = run_socket(#state{transport = Transport,
socket = Sock, socket = Sock,
@ -136,8 +127,7 @@ send_fun(Transport, Sock, Peername) ->
init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) -> init_parse_state(State = #state{max_packet_size = Size, proto_state = ProtoState}) ->
Version = emqx_protocol:get(proto_ver, ProtoState), Version = emqx_protocol:get(proto_ver, ProtoState),
State#state{parse_state = emqx_frame:initial_state( State#state{parse_state = emqx_frame:initial_state(#{max_packet_size => Size, version => Version})}.
#{max_packet_size => Size, version => Version})}.
handle_call(info, From, State = #state{proto_state = ProtoState}) -> handle_call(info, From, State = #state{proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState), ProtoInfo = emqx_protocol:info(ProtoState),
@ -194,10 +184,6 @@ handle_info({suback, PacketId, GrantedQos}, State) ->
emqx_protocol:send(Packet, ProtoState) emqx_protocol:send(Packet, ProtoState)
end, State); end, State);
%% Fastlane
handle_info({dispatch, _Topic, Msg}, State) ->
handle_info({deliver, emqx_message:set_flag(qos, ?QOS_0, Msg)}, State);
handle_info({deliver, Message}, State) -> handle_info({deliver, Message}, State) ->
with_proto( with_proto(
fun(ProtoState) -> fun(ProtoState) ->

View File

@ -42,10 +42,14 @@ start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
start_http_listener('mqtt:wss', ListenOn, Options). start_http_listener('mqtt:wss', ListenOn, Options).
start_mqtt_listener(Name, ListenOn, Options) -> start_mqtt_listener(Name, ListenOn, Options) ->
{ok, _} = esockd:open(Name, ListenOn, merge_sockopts(Options), {emqx_connection, start_link, []}). SockOpts = esockd:parse_opt(Options),
MFA = {emqx_connection, start_link, [Options -- SockOpts]},
{ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA).
start_http_listener(Name, ListenOn, Options) -> start_http_listener(Name, ListenOn, Options) ->
{ok, _} = mochiweb:start_http(Name, ListenOn, Options, {emqx_ws, handle_request, []}). SockOpts = esockd:parse_opt(Options),
MFA = {emqx_ws, handle_request, [Options -- SockOpts]},
{ok, _} = mochiweb:start_http(Name, ListenOn, SockOpts, MFA).
%% @doc Restart all listeners %% @doc Restart all listeners
-spec(restart_all() -> ok). -spec(restart_all() -> ok).
@ -56,7 +60,7 @@ restart_all() ->
restart_listener({tcp, ListenOn, _Opts}) -> restart_listener({tcp, ListenOn, _Opts}) ->
esockd:reopen('mqtt:tcp', ListenOn); esockd:reopen('mqtt:tcp', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> restart_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls ->
esockd:reopen('mqtt:tls', ListenOn); esockd:reopen('mqtt:ssl', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> restart_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:restart_http('mqtt:ws', ListenOn); mochiweb:restart_http('mqtt:ws', ListenOn);
restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> restart_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
@ -73,7 +77,7 @@ stop_all() ->
stop_listener({tcp, ListenOn, _Opts}) -> stop_listener({tcp, ListenOn, _Opts}) ->
esockd:close('mqtt:tcp', ListenOn); esockd:close('mqtt:tcp', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls -> stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls ->
esockd:close('mqtt:tls', ListenOn); esockd:close('mqtt:ssl', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws -> stop_listener({Proto, ListenOn, _Opts}) when Proto == http; Proto == ws ->
mochiweb:stop_http('mqtt:ws', ListenOn); mochiweb:stop_http('mqtt:ws', ListenOn);
stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss -> stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
@ -81,7 +85,7 @@ stop_listener({Proto, ListenOn, _Opts}) when Proto == https; Proto == wss ->
stop_listener({Proto, ListenOn, _Opts}) -> stop_listener({Proto, ListenOn, _Opts}) ->
esockd:close(Proto, ListenOn). esockd:close(Proto, ListenOn).
merge_sockopts(Options) -> merge_default(Options) ->
case lists:keytake(tcp_options, 1, Options) of case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} -> {value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1]; [{tcp_options, emqx_misc:merge_opts(?MQTT_SOCKOPTS, TcpOpts)} | Options1];
@ -89,11 +93,3 @@ merge_sockopts(Options) ->
[{tcp_options, ?MQTT_SOCKOPTS} | Options] [{tcp_options, ?MQTT_SOCKOPTS} | Options]
end. end.
%% all() ->
%% [Listener || Listener = {{Proto, _}, _Pid} <- esockd:listeners(), is_mqtt(Proto)].
%%is_mqtt('mqtt:tcp') -> true;
%%is_mqtt('mqtt:tls') -> true;
%%is_mqtt('mqtt:ws') -> true;
%%is_mqtt('mqtt:wss') -> true;
%%is_mqtt(_Proto) -> false.

View File

@ -37,6 +37,7 @@ new(From, Flags, Topic, Payload) when is_atom(From); is_record(From, client) ->
-spec(new(atom() | client(), message_flags(), message_headers(), topic(), payload()) -> message()). -spec(new(atom() | client(), message_flags(), message_headers(), topic(), payload()) -> message()).
new(From, Flags, Headers, Topic, Payload) when is_atom(From); is_record(From, client) -> new(From, Flags, Headers, Topic, Payload) when is_atom(From); is_record(From, client) ->
#message{id = msgid(), #message{id = msgid(),
qos = ?QOS0,
from = From, from = From,
sender = self(), sender = self(),
flags = Flags, flags = Flags,

View File

@ -21,7 +21,7 @@
%% This module implements a simple in-memory queue for MQTT persistent session. %% This module implements a simple in-memory queue for MQTT persistent session.
%% %%
%% If the broker restarted or crashed, all the messages queued will be gone. %% If the broker restarted or crashed, all the messages queued will be gone.
%% %%
%% Concept of Message Queue and Inflight Window: %% Concept of Message Queue and Inflight Window:
%% %%
%% |<----------------- Max Len ----------------->| %% |<----------------- Max Len ----------------->|
@ -154,7 +154,7 @@ stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped
%% @doc Enqueue a message. %% @doc Enqueue a message.
-spec(in(message(), mqueue()) -> mqueue()). -spec(in(message(), mqueue()) -> mqueue()).
in(#message{flags = #{qos := ?QOS_0}}, MQ = #mqueue{qos0 = false}) -> in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
MQ; MQ;
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};

View File

@ -1,44 +1,36 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_protocol). -module(emqx_protocol).
-include("emqx.hrl"). -include("emqx.hrl").
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-include("emqx_misc.hrl"). -include("emqx_misc.hrl").
-import(proplists, [get_value/2, get_value/3]). -import(proplists, [get_value/2, get_value/3]).
%% API %% API
-export([init/3, init/5, get/2, info/1, stats/1, clientid/1, client/1, session/1]). -export([init/3, init/5, get/2, info/1, stats/1, clientid/1, client/1, session/1]).
-export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]). -export([subscribe/2, unsubscribe/2, pubrel/2, shutdown/2]).
-export([received/2, send/2]). -export([received/2, send/2]).
-export([process/2]). -export([process/2]).
-ifdef(TEST). -ifdef(TEST).
-compile(export_all). -compile(export_all).
-endif. -endif.
-record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, -record(proto_stats, {enable_stats = false, recv_pkt = 0, recv_msg = 0, send_pkt = 0, send_msg = 0}).
send_pkt = 0, send_msg = 0}).
%% Protocol State %% Protocol State
%% ws_initial_headers: Headers from first HTTP request for WebSocket Client. %% ws_initial_headers: Headers from first HTTP request for WebSocket Client.
@ -76,23 +68,22 @@ init(Peername, SendFun, Opts) ->
keepalive_backoff = Backoff, keepalive_backoff = Backoff,
stats_data = #proto_stats{enable_stats = EnableStats}}. stats_data = #proto_stats{enable_stats = EnableStats}}.
init(_Transport, _Sock, Peername, SendFun, Opts) -> init(_Transport, _Sock, Peername, SendFun, Options) ->
init(Peername, SendFun, Opts). enrich_opt(Options, init(Peername, SendFun, Options)).
%%enrich_opt(Conn:opts(), Conn, ).
enrich_opt([], _Conn, State) -> enrich_opt([], State) ->
State; State;
enrich_opt([{mountpoint, MountPoint} | ConnOpts], Conn, State) -> enrich_opt([{mountpoint, MountPoint} | ConnOpts], State) ->
enrich_opt(ConnOpts, Conn, State#proto_state{mountpoint = MountPoint}); enrich_opt(ConnOpts, State#proto_state{mountpoint = MountPoint});
enrich_opt([{peer_cert_as_username, N} | ConnOpts], Conn, State) -> %%enrich_opt([{peer_cert_as_username, N} | ConnOpts], State) ->
enrich_opt(ConnOpts, Conn, State#proto_state{peercert_username = peercert_username(N, Conn)}); %% enrich_opt(ConnOpts, State#proto_state{peercert_username = peercert_username(N, Conn)});
enrich_opt([_ | ConnOpts], Conn, State) -> enrich_opt([_ | ConnOpts], State) ->
enrich_opt(ConnOpts, Conn, State). enrich_opt(ConnOpts, State).
peercert_username(cn, Conn) -> %%peercert_username(cn, Conn) ->
Conn:peer_cert_common_name(); %% Conn:peer_cert_common_name();
peercert_username(dn, Conn) -> %%peercert_username(dn, Conn) ->
Conn:peer_cert_subject(). %% Conn:peer_cert_subject().
repl_username_with_peercert(State = #proto_state{peercert_username = undefined}) -> repl_username_with_peercert(State = #proto_state{peercert_username = undefined}) ->
State; State;
@ -122,17 +113,14 @@ client(#proto_state{client_id = ClientId,
proto_ver = ProtoVer, proto_ver = ProtoVer,
keepalive = Keepalive, keepalive = Keepalive,
will_msg = WillMsg, will_msg = WillMsg,
ws_initial_headers = WsInitialHeaders, ws_initial_headers = _WsInitialHeaders,
mountpoint = MountPoint, mountpoint = _MountPoint,
connected_at = Time}) -> connected_at = _Time}) ->
WillTopic = if WillTopic = if
WillMsg =:= undefined -> undefined; WillMsg =:= undefined -> undefined;
true -> WillMsg#message.topic true -> WillMsg#message.topic
end, end,
#client{id = ClientId, #client{id = ClientId, pid = ClientPid, username = Username, peername = Peername}.
pid = ClientPid,
username = Username,
peername = Peername}.
session(#proto_state{session = Session}) -> session(#proto_state{session = Session}) ->
Session. Session.

View File

@ -1,48 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_session).
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_misc.hrl").
-import(emqx_misc, [start_timer/2]).
-import(proplists, [get_value/2, get_value/3]).
%% Session API
-export([start_link/1, resume/2, discard/2]).
%% Management and Monitor API
-export([state/1, info/1, stats/1]).
%% PubSub API
-export([subscribe/2, subscribe/3, publish/2, puback/2, pubrec/2,
pubrel/2, pubcomp/2, unsubscribe/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(MQueue, emqx_mqueue).
%% A stateful interaction between a Client and a Server. Some Sessions %% A stateful interaction between a Client and a Server. Some Sessions
%% last only as long as the Network Connection, others can span multiple %% last only as long as the Network Connection, others can span multiple
@ -66,6 +34,32 @@
%% %%
%% If the session is currently disconnected, the time at which the Session state %% If the session is currently disconnected, the time at which the Session state
%% will be deleted. %% will be deleted.
-module(emqx_session).
-behaviour(gen_server).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("emqx_misc.hrl").
-import(emqx_misc, [start_timer/2]).
-import(proplists, [get_value/2, get_value/3]).
%% Session API
-export([start_link/1, resume/2, discard/2]).
%% Management and Monitor API
-export([state/1, info/1, stats/1]).
%% PubSub API
-export([subscribe/2, subscribe/3]).
-export([publish/2, puback/2, pubrec/2, pubrel/2, pubcomp/2]).
-export([unsubscribe/2]).
%% gen_server Function Exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(MQueue, emqx_mqueue).
-record(state, -record(state,
{ %% Clean Start Flag { %% Clean Start Flag
clean_start = false :: boolean(), clean_start = false :: boolean(),
@ -145,9 +139,7 @@
}). }).
-define(TIMEOUT, 60000). -define(TIMEOUT, 60000).
-define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]). -define(INFO_KEYS, [clean_start, client_id, username, client_pid, binding, created_at]).
-define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid, -define(STATE_KEYS, [clean_start, client_id, username, binding, client_pid, old_client_pid,
next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight, next_msg_id, max_subscriptions, subscriptions, upgrade_qos, inflight,
max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel, max_inflight, retry_interval, mqueue, awaiting_rel, max_awaiting_rel,
@ -169,71 +161,71 @@ start_link(Attrs) ->
%% @doc Subscribe topics %% @doc Subscribe topics
-spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok). -spec(subscribe(pid(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(SessionPid, TopicTable) -> %%TODO: the ack function??... subscribe(SPid, TopicTable) -> %%TODO: the ack function??...
gen_server:cast(SessionPid, {subscribe, self(), TopicTable, fun(_) -> ok end}). gen_server:cast(SPid, {subscribe, self(), TopicTable, fun(_) -> ok end}).
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok). -spec(subscribe(pid(), mqtt_packet_id(), [{binary(), [emqx_topic:option()]}]) -> ok).
subscribe(SessionPid, PacketId, TopicTable) -> %%TODO: the ack function??... subscribe(SPid, PacketId, TopicTable) -> %%TODO: the ack function??...
From = self(), From = self(),
AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end, AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end,
gen_server:cast(SessionPid, {subscribe, From, TopicTable, AckFun}). gen_server:cast(SPid, {subscribe, From, TopicTable, AckFun}).
%% @doc Publish Message %% @doc Publish Message
-spec(publish(pid(), message()) -> ok | {error, term()}). -spec(publish(pid(), message()) -> {ok, delivery()} | {error, term()}).
publish(_SessionPid, Msg = #message{qos = ?QOS_0}) -> publish(_SPid, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 Directly %% Publish QoS0 Directly
emqx_broker:publish(Msg), ok; emqx_broker:publish(Msg);
publish(_SessionPid, Msg = #message{qos = ?QOS_1}) -> publish(_SPid, Msg = #message{qos = ?QOS_1}) ->
%% Publish QoS1 message directly for client will PubAck automatically %% Publish QoS1 message directly for client will PubAck automatically
emqx_broker:publish(Msg), ok; emqx_broker:publish(Msg);
publish(SessionPid, Msg = #message{qos = ?QOS_2}) -> publish(SPid, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 to Session %% Publish QoS2 to Session
gen_server:call(SessionPid, {publish, Msg}, ?TIMEOUT). gen_server:call(SPid, {publish, Msg}, infinity).
%% @doc PubAck Message %% @doc PubAck Message
-spec(puback(pid(), mqtt_packet_id()) -> ok). -spec(puback(pid(), mqtt_packet_id()) -> ok).
puback(SessionPid, PacketId) -> puback(SPid, PacketId) ->
gen_server:cast(SessionPid, {puback, PacketId}). gen_server:cast(SPid, {puback, PacketId}).
-spec(pubrec(pid(), mqtt_packet_id()) -> ok). -spec(pubrec(pid(), mqtt_packet_id()) -> ok).
pubrec(SessionPid, PacketId) -> pubrec(SPid, PacketId) ->
gen_server:cast(SessionPid, {pubrec, PacketId}). gen_server:cast(SPid, {pubrec, PacketId}).
-spec(pubrel(pid(), mqtt_packet_id()) -> ok). -spec(pubrel(pid(), mqtt_packet_id()) -> ok).
pubrel(SessionPid, PacketId) -> pubrel(SPid, PacketId) ->
gen_server:cast(SessionPid, {pubrel, PacketId}). gen_server:cast(SPid, {pubrel, PacketId}).
-spec(pubcomp(pid(), mqtt_packet_id()) -> ok). -spec(pubcomp(pid(), mqtt_packet_id()) -> ok).
pubcomp(SessionPid, PacketId) -> pubcomp(SPid, PacketId) ->
gen_server:cast(SessionPid, {pubcomp, PacketId}). gen_server:cast(SPid, {pubcomp, PacketId}).
%% @doc Unsubscribe the topics %% @doc Unsubscribe the topics
-spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok). -spec(unsubscribe(pid(), [{binary(), [suboption()]}]) -> ok).
unsubscribe(SessionPid, TopicTable) -> unsubscribe(SPid, TopicTable) ->
gen_server:cast(SessionPid, {unsubscribe, self(), TopicTable}). gen_server:cast(SPid, {unsubscribe, self(), TopicTable}).
%% @doc Resume the session %% @doc Resume the session
-spec(resume(pid(), pid()) -> ok). -spec(resume(pid(), pid()) -> ok).
resume(SessionPid, ClientPid) -> resume(SPid, ClientPid) ->
gen_server:cast(SessionPid, {resume, ClientPid}). gen_server:cast(SPid, {resume, ClientPid}).
%% @doc Get session state %% @doc Get session state
state(SessionPid) when is_pid(SessionPid) -> state(SPid) when is_pid(SPid) ->
gen_server:call(SessionPid, state). gen_server:call(SPid, state).
%% @doc Get session info %% @doc Get session info
-spec(info(pid() | #state{}) -> list(tuple())). -spec(info(pid() | #state{}) -> list(tuple())).
info(SessionPid) when is_pid(SessionPid) -> info(SPid) when is_pid(SPid) ->
gen_server:call(SessionPid, info); gen_server:call(SPid, info);
info(State) when is_record(State, state) -> info(State) when is_record(State, state) ->
?record_to_proplist(state, State, ?INFO_KEYS). ?record_to_proplist(state, State, ?INFO_KEYS).
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SessionPid) when is_pid(SessionPid) -> stats(SPid) when is_pid(SPid) ->
gen_server:call(SessionPid, stats); gen_server:call(SPid, stats);
stats(#state{max_subscriptions = MaxSubscriptions, stats(#state{max_subscriptions = MaxSubscriptions,
subscriptions = Subscriptions, subscriptions = Subscriptions,
@ -257,8 +249,8 @@ stats(#state{max_subscriptions = MaxSubscriptions,
%% @doc Discard the session %% @doc Discard the session
-spec(discard(pid(), client_id()) -> ok). -spec(discard(pid(), client_id()) -> ok).
discard(SessionPid, ClientId) -> discard(SPid, ClientId) ->
gen_server:call(SessionPid, {discard, ClientId}). gen_server:call(SPid, {discard, ClientId}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% gen_server Callbacks %% gen_server Callbacks
@ -342,41 +334,34 @@ handle_call(state, _From, State) ->
reply(?record_to_proplist(state, State, ?STATE_KEYS), State); reply(?record_to_proplist(state, State, ?STATE_KEYS), State);
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Session] Unexpected request: ~p", [Req]), emqx_logger:error("[Session] unexpected call: ~p", [Req]),
{reply, ignore, State}. {reply, ignored, State}.
handle_cast({subscribe, From, TopicTable, AckFun}, handle_cast({subscribe, From, TopicTable, AckFun},
State = #state{client_id = ClientId, State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
username = Username,
subscriptions = Subscriptions}) ->
?LOG(info, "Subscribe ~p", [TopicTable], State), ?LOG(info, "Subscribe ~p", [TopicTable], State),
{GrantedQos, Subscriptions1} = {GrantedQos, Subscriptions1} =
lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) -> lists:foldl(fun({Topic, Opts}, {QosAcc, SubMap}) ->
io:format("SubOpts: ~p~n", [Opts]), NewQos = get_value(qos, Opts),
Fastlane = lists:member(fastlane, Opts),
NewQos = if Fastlane == true -> ?QOS_0; true -> get_value(qos, Opts) end,
SubMap1 = SubMap1 =
case maps:find(Topic, SubMap) of case maps:find(Topic, SubMap) of
{ok, NewQos} -> {ok, NewQos} ->
?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State), ?LOG(warning, "Duplicated subscribe: ~s, qos = ~w", [Topic, NewQos], State),
SubMap; SubMap;
{ok, OldQos} -> {ok, OldQos} ->
emqx_broker:setopts(Topic, ClientId, [{qos, NewQos}]), %% TODO:....
emqx_broker:set_subopts(Topic, ClientId, [{qos, NewQos}]),
emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}),
?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", ?LOG(warning, "Duplicated subscribe ~s, old_qos=~w, new_qos=~w", [Topic, OldQos, NewQos], State),
[Topic, OldQos, NewQos], State),
maps:put(Topic, NewQos, SubMap); maps:put(Topic, NewQos, SubMap);
error -> error ->
case Fastlane of %% TODO:....
true -> emqx:subscribe(Topic, From, Opts); emqx:subscribe(Topic, ClientId, Opts),
false -> emqx:subscribe(Topic, ClientId, Opts)
end,
emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}), emqx_hooks:run('session.subscribed', [ClientId, Username], {Topic, Opts}),
maps:put(Topic, NewQos, SubMap) maps:put(Topic, NewQos, SubMap)
end, end,
{[NewQos|QosAcc], SubMap1} {[NewQos|QosAcc], SubMap1}
end, {[], Subscriptions}, TopicTable), end, {[], Subscriptions}, TopicTable),
io:format("GrantedQos: ~p~n", [GrantedQos]),
AckFun(lists:reverse(GrantedQos)), AckFun(lists:reverse(GrantedQos)),
{noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate}; {noreply, emit_stats(State#state{subscriptions = Subscriptions1}), hibernate};
@ -501,7 +486,7 @@ handle_cast({resume, ClientPid},
{noreply, emit_stats(dequeue(retry_delivery(true, State1)))}; {noreply, emit_stats(dequeue(retry_delivery(true, State1)))};
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
emqx_logger:error("[Session] Unexpected msg: ~p", [Msg]), emqx_logger:error("[Session] unexpected cast: ~p", [Msg]),
{noreply, State}. {noreply, State}.
%% Ignore Messages delivered by self %% Ignore Messages delivered by self
@ -546,16 +531,15 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{old_client_pid = Pid}) ->
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) -> handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = ClientPid}) ->
?LOG(error, "Unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p", ?LOG(error, "unexpected EXIT: client_pid=~p, exit_pid=~p, reason=~p",
[ClientPid, Pid, Reason], State), [ClientPid, Pid, Reason], State),
{noreply, State, hibernate}; {noreply, State, hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
emqx_logger:error("[Session] Unexpected info: ~p", [Info]), emqx_logger:error("[Session] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(Reason, #state{client_id = ClientId, username = Username}) -> terminate(Reason, #state{client_id = ClientId, username = Username}) ->
emqx_hooks:run('session.terminated', [ClientId, Username, Reason]), emqx_hooks:run('session.terminated', [ClientId, Username, Reason]),
emqx_sm:unregister_session(ClientId). emqx_sm:unregister_session(ClientId).

View File

@ -1,18 +1,16 @@
%%%=================================================================== %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%% Copyright (c) 2013-2018 EMQ Inc. All rights reserved. %%
%%% %% Licensed under the Apache License, Version 2.0 (the "License");
%%% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License.
%%% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at
%%% You may obtain a copy of the License at %%
%%% %% http://www.apache.org/licenses/LICENSE-2.0
%%% http://www.apache.org/licenses/LICENSE-2.0 %%
%%% %% Unless required by applicable law or agreed to in writing, software
%%% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS,
%%% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and
%%% See the License for the specific language governing permissions and %% limitations under the License.
%%% limitations under the License.
%%%===================================================================
-module(emqx_sup). -module(emqx_sup).
@ -74,7 +72,7 @@ init([]) ->
%% Connection Manager %% Connection Manager
CMSup = supervisor_spec(emqx_cm_sup), CMSup = supervisor_spec(emqx_cm_sup),
%% WebSocket Connection Sup %% WebSocket Connection Sup
WSConnSup = supervisor_spec(emqx_ws_connection_sup), %% WSConnSup = supervisor_spec(emqx_ws_connection_sup),
%% Sys Sup %% Sys Sup
SysSup = supervisor_spec(emqx_sys_sup), SysSup = supervisor_spec(emqx_sys_sup),
{ok, {{one_for_all, 0, 1}, {ok, {{one_for_all, 0, 1},
@ -86,7 +84,7 @@ init([]) ->
SMSup, SMSup,
SessionSup, SessionSup,
CMSup, CMSup,
WSConnSup, %%WSConnSup,
SysSup]}}. SysSup]}}.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -171,6 +171,6 @@ publish(metrics, Metrics) ->
safe_publish(Topic, Payload) -> safe_publish(Topic, Payload) ->
safe_publish(Topic, #{}, Payload). safe_publish(Topic, #{}, Payload).
safe_publish(Topic, Flags, Payload) -> safe_publish(Topic, Flags, Payload) ->
Flags1 = maps:merge(#{sys => true, qos => 0}, Flags), Flags1 = maps:merge(#{sys => true}, Flags),
emqx_broker:safe_publish(emqx_message:new(?SYS, Flags1, Topic, iolist_to_binary(Payload))). emqx_broker:safe_publish(emqx_message:new(?SYS, Flags1, Topic, iolist_to_binary(Payload))).

View File

@ -158,5 +158,5 @@ safe_publish(Event, WarnMsg) ->
emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))). emqx_broker:safe_publish(sysmon_msg(Topic, iolist_to_binary(WarnMsg))).
sysmon_msg(Topic, Payload) -> sysmon_msg(Topic, Payload) ->
emqx_message:new(?SYSMON, #{sys => true, qos => 0}, Topic, Payload). emqx_message:new(?SYSMON, #{sys => true}, Topic, Payload).