Merge pull request #3084 from emqx/master

Auto-pull-request-by-2019-12-07
This commit is contained in:
turtleDeng 2019-12-07 19:57:31 +08:00 committed by GitHub
commit 0b6111a7dc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1342 additions and 1134 deletions

View File

@ -520,7 +520,7 @@ mqtt.max_qos_allowed = 2
## Maximum Topic Alias, 0 means no topic alias supported.
##
## Value: 0-65535
mqtt.max_topic_alias = 0
mqtt.max_topic_alias = 65535
## Whether the Server supports MQTT retained messages.
##
@ -542,6 +542,11 @@ mqtt.shared_subscription = true
## Value: true | false
mqtt.ignore_loop_deliver = false
## Whether to parse the MQTT frame in strict mode
##
## Value: true | false
mqtt.strict_mode = false
##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------
@ -628,7 +633,7 @@ zone.external.force_gc_policy = 1000|1MB
## Maximum Topic Alias, 0 means no limit.
##
## Value: 0-65535
## zone.external.max_topic_alias = 0
## zone.external.max_topic_alias = 65535
## Whether the Server supports retained messages.
##
@ -674,7 +679,7 @@ zone.external.max_inflight = 32
## Retry interval for QoS1/2 message delivering.
##
## Value: Duration
zone.external.retry_interval = 20s
zone.external.retry_interval = 30s
## Maximum QoS2 packets (Client -> Broker) awaiting PUBREL, 0 means no limit.
##
@ -751,6 +756,11 @@ zone.external.use_username_as_clientid = false
## Value: true | false
zone.external.ignore_loop_deliver = false
## Whether to parse the MQTT frame in strict mode
##
## Value: true | false
zone.external.strict_mode = false
##--------------------------------------------------------------------
## Internal Zone
@ -832,6 +842,11 @@ zone.internal.use_username_as_clientid = false
## Value: true | false
zone.internal.ignore_loop_deliver = false
## Whether to parse the MQTT frame in strict mode
##
## Value: true | false
zone.internal.strict_mode = false
##--------------------------------------------------------------------
## Listeners
##--------------------------------------------------------------------
@ -906,7 +921,7 @@ listener.tcp.external.access.1 = allow all
## Enable the option for X.509 certificate based authentication.
## EMQX will use the common name of certificate as MQTT username.
##
## Value: cn | dn
## Value: cn | dn | crt
## listener.tcp.external.peer_cert_as_username = cn
## The TCP backlog defines the maximum length that the queue of pending
@ -1251,10 +1266,10 @@ listener.ssl.external.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-G
## Value: on | off
## listener.ssl.external.honor_cipher_order = on
## Use the CN, EN or CRT field from the client certificate as a username.
## Use the CN, DN or CRT field from the client certificate as a username.
## Notice that 'verify' should be set as 'verify_peer'.
##
## Value: cn | en | crt
## Value: cn | dn | crt
## listener.ssl.external.peer_cert_as_username = cn
## TCP backlog for the SSL connection.

View File

@ -178,8 +178,9 @@
%% Maximum MQTT Packet ID and Length
%%--------------------------------------------------------------------
-define(MAX_PACKET_ID, 16#ffff).
-define(MAX_PACKET_SIZE, 16#fffffff).
-define(MAX_PACKET_ID, 16#FFFF).
-define(MAX_PACKET_SIZE, 16#FFFFFFF).
-define(MAX_TOPIC_AlIAS, 16#FFFF).
%%--------------------------------------------------------------------
%% MQTT Frame Mask
@ -318,6 +319,9 @@
%% MQTT Packet Match
%%--------------------------------------------------------------------
-define(CONNECT_PACKET(),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT}}).
-define(CONNECT_PACKET(Var),
#mqtt_packet{header = #mqtt_packet_header{type = ?CONNECT},
variable = Var}).

View File

@ -685,9 +685,9 @@ end}.
{validators, ["range:0-2"]}
]}.
%% @doc Set the Maximum topic alias.
%% @doc Set the Maximum Topic Alias.
{mapping, "mqtt.max_topic_alias", "emqx.max_topic_alias", [
{default, 0},
{default, 65535},
{datatype, integer}
]}.
@ -715,6 +715,12 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether to parse the MQTT frame in strict mode
{mapping, "mqtt.strict_mode", "emqx.strict_mode", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
@ -841,8 +847,8 @@ end}.
%% @doc Retry interval for redelivering QoS1/2 messages.
{mapping, "zone.$name.retry_interval", "emqx.zones", [
{default, "20s"},
{datatype, {duration, ms}}
{default, "30s"},
{datatype, {duration, s}}
]}.
%% @doc Max Packets that Awaiting PUBREL, 0 means no limit
@ -854,7 +860,7 @@ end}.
%% @doc Awaiting PUBREL timeout
{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
{default, "300s"},
{datatype, {duration, ms}}
{datatype, {duration, s}}
]}.
%% @doc Ignore loop delivery of messages
@ -931,14 +937,14 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Whether to parse the MQTT frame in strict mode
{mapping, "zone.$name.strict_mode", "emqx.zones", [
{default, false},
{datatype, {enum, [true, false]}}
]}.
{translation, "emqx.zones", fun(Conf) ->
Mapping = fun("retain_available", Val) ->
{retain_available, Val};
("wildcard_subscription", Val) ->
{wildcard_subscription, Val};
("shared_subscription", Val) ->
{shared_subscription, Val};
("publish_limit", Val) ->
Mapping = fun("publish_limit", Val) ->
[L, D] = string:tokens(Val, ", "),
Limit = list_to_integer(L),
Duration = case cuttlefish_duration:parse(D, s) of
@ -1000,8 +1006,6 @@ end}.
end, #{}, string:tokens(Val, ",")),
{mqueue_priorities, MqueuePriorities}
end;
("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,

34
src/emqx.app.src.script Normal file
View File

@ -0,0 +1,34 @@
%%-*- mode: erlang -*-
%% .app.src.script
Config = case os:getenv("EMQX_DESC") of
false -> CONFIG; % env var not defined
[] -> CONFIG; % env var set to empty string
Desc ->
[begin
AppConf0 = lists:keystore(description, 1, AppConf, {description, Desc}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- CONFIG]
end,
RemoveLeadingV =
fun(Tag) ->
case re:run(Tag, "v\[0-9\]+\.\[0-9\]+\.*") of
nomatch ->
Tag;
{match, _} ->
%% if it is a version number prefixed by 'v' then remove the 'v'
"v" ++ Vsn = Tag,
Vsn
end
end,
case os:getenv("EMQX_DEPS_DEFAULT_VSN") of
false -> Config; % env var not defined
[] -> Config; % env var set to empty string
Tag ->
[begin
AppConf0 = lists:keystore(vsn, 1, AppConf, {vsn, RemoveLeadingV(Tag)}),
{application, App, AppConf0}
end || Conf = {application, App, AppConf} <- Config]
end.

View File

@ -24,16 +24,17 @@
, reload_acl/0
]).
-type(result() :: #{auth_result := emqx_types:auth_result(),
anonymous := boolean()
}).
%%--------------------------------------------------------------------
%% APIs
%%--------------------------------------------------------------------
-spec(authenticate(emqx_types:clientinfo())
-> {ok, #{auth_result := emqx_types:auth_result(),
anonymous := boolean}} | {error, term()}).
authenticate(Client) ->
case emqx_hooks:run_fold('client.authenticate',
[Client], default_auth_result(maps:get(zone, Client, undefined))) of
-spec(authenticate(emqx_types:clientinfo()) -> {ok, result()} | {error, term()}).
authenticate(ClientInfo = #{zone := Zone}) ->
case emqx_hooks:run_fold('client.authenticate', [ClientInfo], default_auth_result(Zone)) of
Result = #{auth_result := success, anonymous := true} ->
emqx_metrics:inc('auth.mqtt.anonymous'),
{ok, Result};
@ -44,36 +45,33 @@ authenticate(Client) ->
end.
%% @doc Check ACL
-spec(check_acl(emqx_types:cient(), emqx_types:pubsub(), emqx_types:topic())
-spec(check_acl(emqx_types:clientinfo(), emqx_types:pubsub(), emqx_types:topic())
-> allow | deny).
check_acl(Client, PubSub, Topic) ->
check_acl(ClientInfo, PubSub, Topic) ->
case emqx_acl_cache:is_enabled() of
true ->
check_acl_cache(Client, PubSub, Topic);
false ->
do_check_acl(Client, PubSub, Topic)
true -> check_acl_cache(ClientInfo, PubSub, Topic);
false -> do_check_acl(ClientInfo, PubSub, Topic)
end.
check_acl_cache(Client, PubSub, Topic) ->
check_acl_cache(ClientInfo, PubSub, Topic) ->
case emqx_acl_cache:get_acl_cache(PubSub, Topic) of
not_found ->
AclResult = do_check_acl(Client, PubSub, Topic),
AclResult = do_check_acl(ClientInfo, PubSub, Topic),
emqx_acl_cache:put_acl_cache(PubSub, Topic, AclResult),
AclResult;
AclResult -> AclResult
end.
do_check_acl(#{zone := Zone} = Client, PubSub, Topic) ->
do_check_acl(ClientInfo = #{zone := Zone}, PubSub, Topic) ->
Default = emqx_zone:get_env(Zone, acl_nomatch, deny),
case emqx_hooks:run_fold('client.check_acl', [Client, PubSub, Topic], Default) of
case emqx_hooks:run_fold('client.check_acl', [ClientInfo, PubSub, Topic], Default) of
allow -> allow;
_Other -> deny
end.
-spec(reload_acl() -> ok | {error, term()}).
reload_acl() ->
emqx_acl_cache:is_enabled()
andalso emqx_acl_cache:empty_acl_cache(),
emqx_acl_cache:is_enabled() andalso emqx_acl_cache:empty_acl_cache(),
emqx_mod_acl_internal:reload_acl().
default_auth_result(Zone) ->

View File

@ -124,14 +124,16 @@ subscribe(Topic, SubOpts) when is_binary(Topic), is_map(SubOpts) ->
-spec(subscribe(emqx_topic:topic(), emqx_types:subid(), emqx_types:subopts()) -> ok).
subscribe(Topic, SubId, SubOpts) when is_binary(Topic), ?is_subid(SubId), is_map(SubOpts) ->
SubPid = self(),
case ets:member(?SUBOPTION, {SubPid, Topic}) of
false ->
case ets:member(?SUBOPTION, {SubPid = self(), Topic}) of
false -> %% New
ok = emqx_broker_helper:register_sub(SubPid, SubId),
do_subscribe(Topic, SubPid, with_subid(SubId, SubOpts));
true -> ok
true -> %% Existed
set_subopts(SubPid, Topic, with_subid(SubId, SubOpts)),
ok %% ensure to return 'ok'
end.
-compile({inline, [with_subid/2]}).
with_subid(undefined, SubOpts) ->
SubOpts;
with_subid(SubId, SubOpts) ->
@ -377,7 +379,11 @@ get_subopts(SubId, Topic) when ?is_subid(SubId) ->
-spec(set_subopts(emqx_topic:topic(), emqx_types:subopts()) -> boolean()).
set_subopts(Topic, NewOpts) when is_binary(Topic), is_map(NewOpts) ->
Sub = {self(), Topic},
set_subopts(self(), Topic, NewOpts).
%% @private
set_subopts(SubPid, Topic, NewOpts) ->
Sub = {SubPid, Topic},
case ets:lookup(?SUBOPTION, Sub) of
[{_, OldOpts}] ->
ets:insert(?SUBOPTION, {Sub, maps:merge(OldOpts, NewOpts)});

File diff suppressed because it is too large Load Diff

View File

@ -28,12 +28,16 @@
-export([start_link/0]).
-export([ register_channel/1
, register_channel/2
, register_channel/3
, unregister_channel/1
]).
-export([ get_chan_attrs/1
, get_chan_attrs/2
, set_chan_attrs/2
-export([connection_closed/1]).
-export([ get_chan_info/1
, get_chan_info/2
, set_chan_info/2
]).
-export([ get_chan_stats/1
@ -43,7 +47,11 @@
-export([ open_session/3
, discard_session/1
, discard_session/2
, takeover_session/1
, takeover_session/2
, kick_session/1
, kick_session/2
]).
-export([ lookup_channels/1
@ -66,15 +74,13 @@
%% Tables for channel management.
-define(CHAN_TAB, emqx_channel).
-define(CHAN_P_TAB, emqx_channel_p).
-define(CHAN_ATTRS_TAB, emqx_channel_attrs).
-define(CHAN_STATS_TAB, emqx_channel_stats).
-define(CHAN_CONN_TAB, emqx_channel_conn).
-define(CHAN_INFO_TAB, emqx_channel_info).
-define(CHAN_STATS,
[{?CHAN_TAB, 'channels.count', 'channels.max'},
{?CHAN_TAB, 'connections.count', 'connections.max'},
{?CHAN_TAB, 'sessions.count', 'sessions.max'},
{?CHAN_P_TAB, 'sessions.persistent.count', 'sessions.persistent.max'}
{?CHAN_CONN_TAB, 'connections.count', 'connections.max'}
]).
%% Batch drain
@ -94,17 +100,28 @@ start_link() ->
%% @doc Register a channel.
-spec(register_channel(emqx_types:clientid()) -> ok).
register_channel(ClientId) when is_binary(ClientId) ->
register_channel(ClientId) ->
register_channel(ClientId, self()).
%% @doc Register a channel with pid.
-spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
register_channel(ClientId, ChanPid) ->
register_channel(ClientId, ChanPid) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, Chan),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Register a channel with info and stats.
-spec(register_channel(emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok).
register_channel(ClientId, Info, Stats) ->
Chan = {ClientId, ChanPid = self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
register_channel(ClientId, ChanPid).
%% @doc Unregister a channel.
-spec(unregister_channel(emqx_types:clientid()) -> ok).
unregister_channel(ClientId) when is_binary(ClientId) ->
true = do_unregister_channel({ClientId, self()}),
@ -113,52 +130,72 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
%% @private
do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete_object(?CHAN_P_TAB, Chan),
true = ets:delete(?CHAN_ATTRS_TAB, Chan),
true = ets:delete(?CHAN_STATS_TAB, Chan),
true = ets:delete_object(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
-spec(connection_closed(emqx_types:clientid()) -> true).
connection_closed(ClientId) ->
connection_closed(ClientId, self()).
-spec(connection_closed(emqx_types:clientid(), chan_pid()) -> true).
connection_closed(ClientId, ChanPid) ->
ets:delete_object(?CHAN_CONN_TAB, {ClientId, ChanPid}).
%% @doc Get info of a channel.
-spec(get_chan_attrs(emqx_types:clientid()) -> maybe(emqx_types:attrs())).
get_chan_attrs(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_attrs(ClientId, ChanPid) end).
-spec(get_chan_info(emqx_types:clientid()) -> maybe(emqx_types:infos())).
get_chan_info(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_info(ClientId, ChanPid) end).
-spec(get_chan_attrs(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:attrs())).
get_chan_attrs(ClientId, ChanPid) when node(ChanPid) == node() ->
-spec(get_chan_info(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:infos())).
get_chan_info(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
emqx_tables:lookup_value(?CHAN_ATTRS_TAB, Chan);
get_chan_attrs(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_attrs, [ClientId, ChanPid]).
try ets:lookup_element(?CHAN_INFO_TAB, Chan, 2)
catch
error:badarg -> undefined
end;
get_chan_info(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_info, [ClientId, ChanPid]).
%% @doc Set info of a channel.
-spec(set_chan_attrs(emqx_types:clientid(), emqx_types:attrs()) -> ok).
set_chan_attrs(ClientId, Info) when is_binary(ClientId) ->
%% @doc Update infos of the channel.
-spec(set_chan_info(emqx_types:clientid(), emqx_types:attrs()) -> boolean()).
set_chan_info(ClientId, Info) when is_binary(ClientId) ->
Chan = {ClientId, self()},
true = ets:insert(?CHAN_ATTRS_TAB, {Chan, Info}),
ok.
try ets:update_element(?CHAN_INFO_TAB, Chan, {2, Info})
catch
error:badarg -> false
end.
%% @doc Get channel's stats.
-spec(get_chan_stats(emqx_types:clientid()) -> maybe(emqx_types:stats())).
get_chan_stats(ClientId) ->
with_channel(ClientId, fun(ChanPid) -> get_chan_stats(ClientId, ChanPid) end).
-spec(get_chan_stats(emqx_types:clientid(), chan_pid()) -> maybe(emqx_types:stats())).
-spec(get_chan_stats(emqx_types:clientid(), chan_pid())
-> maybe(emqx_types:stats())).
get_chan_stats(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
emqx_tables:lookup_value(?CHAN_STATS_TAB, Chan);
try ets:lookup_element(?CHAN_INFO_TAB, Chan, 3)
catch
error:badarg -> undefined
end;
get_chan_stats(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chan_stats, [ClientId, ChanPid]).
%% @doc Set channel's stats.
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> ok).
-spec(set_chan_stats(emqx_types:clientid(), emqx_types:stats()) -> boolean()).
set_chan_stats(ClientId, Stats) when is_binary(ClientId) ->
set_chan_stats(ClientId, self(), Stats).
-spec(set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats()) -> ok).
-spec(set_chan_stats(emqx_types:clientid(), chan_pid(), emqx_types:stats())
-> boolean()).
set_chan_stats(ClientId, ChanPid, Stats) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_STATS_TAB, {Chan, Stats}),
ok.
try ets:update_element(?CHAN_INFO_TAB, Chan, {3, Stats})
catch
error:badarg -> false
end.
%% @doc Open a session.
-spec(open_session(boolean(), emqx_types:clientinfo(), emqx_types:conninfo())
@ -208,7 +245,7 @@ takeover_session(ClientId) ->
end.
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_attrs(ClientId, ChanPid) of
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
{ok, ConnMod, ChanPid, Session};
@ -237,7 +274,7 @@ discard_session(ClientId) when is_binary(ClientId) ->
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_attrs(ClientId, ChanPid) of
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, discard);
undefined -> ok
@ -246,6 +283,31 @@ discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
discard_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), discard_session, [ClientId, ChanPid]).
kick_session(ClientId) ->
case lookup_channels(ClientId) of
[] -> {error, not_found};
[ChanPid] ->
kick_session(ClientId, ChanPid);
ChanPids ->
[ChanPid|StalePids] = lists:reverse(ChanPids),
?LOG(error, "More than one channel found: ~p", [ChanPids]),
lists:foreach(fun(StalePid) ->
catch discard_session(ClientId, StalePid)
end, StalePids),
kick_session(ClientId, ChanPid)
end.
kick_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
ConnMod:call(ChanPid, kick);
undefined ->
{error, not_found}
end;
kick_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), kick_session, [ClientId, ChanPid]).
%% @doc Is clean start?
% is_clean_start(#{clean_start := false}) -> false;
% is_clean_start(_Attrs) -> true.
@ -292,9 +354,8 @@ cast(Msg) -> gen_server:cast(?CM, Msg).
init([]) ->
TabOpts = [public, {write_concurrency, true}],
ok = emqx_tables:new(?CHAN_TAB, [bag, {read_concurrency, true}|TabOpts]),
ok = emqx_tables:new(?CHAN_P_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_ATTRS_TAB, [set, compressed | TabOpts]),
ok = emqx_tables:new(?CHAN_STATS_TAB, [set | TabOpts]),
ok = emqx_tables:new(?CHAN_CONN_TAB, [bag | TabOpts]),
ok = emqx_tables:new(?CHAN_INFO_TAB, [set, compressed | TabOpts]),
ok = emqx_stats:update_interval(chan_stats, fun ?MODULE:stats_fun/0),
{ok, #{chan_pmon => emqx_pmon:new()}}.

View File

@ -92,7 +92,7 @@
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
@ -115,8 +115,9 @@ info(CPid) when is_pid(CPid) ->
call(CPid, info);
info(State = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list(info(?INFO_KEYS, State)),
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
SockInfo = maps:from_list(
info(?INFO_KEYS, State)),
ChanInfo#{sockinfo => SockInfo}.
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
@ -149,13 +150,6 @@ stats(#state{transport = Transport,
ProcStats = emqx_misc:proc_stats(),
lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
attrs(#state{active_n = ActiveN, sockstate = SockSt, channel = Channel}) ->
SockAttrs = #{active_n => ActiveN,
sockstate => SockSt
},
ChanAttrs = emqx_channel:attrs(Channel),
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
call(Pid, Req) ->
gen_server:call(Pid, Req, infinity).
@ -326,41 +320,42 @@ handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
handle_msg({incoming, Packet}, State) ->
handle_incoming(Packet, State);
handle_msg({outgoing, Packets}, State) ->
handle_outgoing(Packets, State);
handle_msg({Error, _Sock, Reason}, State)
when Error == tcp_error; Error == ssl_error ->
handle_info({sock_error, Reason}, State);
handle_msg({Closed, _Sock}, State)
when Closed == tcp_closed; Closed == ssl_closed ->
handle_info(sock_closed, State);
handle_info({sock_closed, Closed}, close_socket(State));
handle_msg({Passive, _Sock}, State)
when Passive == tcp_passive; Passive == ssl_passive ->
InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
oct => emqx_pd:reset_counter(incoming_bytes)
},
%% In Stats
Pubs = emqx_pd:reset_counter(incoming_pubs),
Bytes = emqx_pd:reset_counter(incoming_bytes),
InStats = #{cnt => Pubs, oct => Bytes},
%% Ensure Rate Limit
NState = ensure_rate_limit(InStats, State),
%% Run GC and Check OOM
NState1 = check_oom(run_gc(InStats, NState)),
handle_info(activate_socket, NState1);
handle_msg(Deliver = {deliver, _Topic, _Msg},
State = #state{channel = Channel}) ->
Delivers = [Deliver|emqx_misc:drain_deliver()],
Ret = emqx_channel:handle_out(Delivers, Channel),
handle_msg(Deliver = {deliver, _Topic, _Msg}, State =
#state{active_n = ActiveN, channel = Channel}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Ret = emqx_channel:handle_deliver(Delivers, Channel),
handle_chan_return(Ret, State);
handle_msg({outgoing, Packets}, State) ->
handle_outgoing(Packets, State);
%% Something sent
handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
true ->
OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
oct => emqx_pd:reset_counter(outgoing_bytes)
},
Pubs = emqx_pd:reset_counter(outgoing_pubs),
Bytes = emqx_pd:reset_counter(outgoing_bytes),
OutStats = #{cnt => Pubs, oct => Bytes},
{ok, check_oom(run_gc(OutStats, State))};
false -> ok
end;
@ -368,6 +363,29 @@ handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
handle_info({sock_error, Reason}, State);
handle_msg({connack, ConnAck}, State) ->
handle_outgoing(ConnAck, State);
handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:register_channel(ClientId, info(State), stats(State));
handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:connection_closed(ClientId),
{ok, State};
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State};
handle_msg({timeout, TRef, TMsg}, State) ->
handle_timeout(TRef, TMsg, State);
@ -434,15 +452,14 @@ handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
},
handle_info(activate_socket, NState);
handle_timeout(TRef, emit_stats, State = #state{stats_timer = TRef,
channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
(ClientId =/= undefined) andalso
handle_timeout(TRef, emit_stats, State =
#state{stats_timer = TRef, channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
{ok, State#state{stats_timer = undefined}};
handle_timeout(TRef, keepalive, State = #state{transport = Transport,
socket = Socket}) ->
handle_timeout(TRef, keepalive, State =
#state{transport = Transport, socket = Socket}) ->
case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} ->
handle_timeout(TRef, {keepalive, RecvOct}, State);
@ -479,6 +496,7 @@ parse_incoming(Data, Packets, State = #state{parse_state = ParseState}) ->
{[{frame_error, Reason}|Packets], State}
end.
-compile({inline, [next_incoming_msgs/1]}).
next_incoming_msgs([Packet]) ->
{incoming, Packet};
next_incoming_msgs(Packets) ->
@ -552,51 +570,24 @@ send(IoData, #state{transport = Transport, socket = Socket}) ->
%%--------------------------------------------------------------------
%% Handle Info
handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
ok = emqx_cm:register_channel(ClientId),
ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
ok = handle_outgoing(ConnAck, State);
handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
emqx_cm:set_chan_attrs(ClientId, attrs(State)),
emqx_cm:set_chan_stats(ClientId, stats(State));
handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
case activate_socket(State) of
{ok, NState = #state{sockstate = NewSst}} ->
if OldSst =/= NewSst ->
{ok, {event, sockstate_changed}, NState};
{ok, {event, NewSst}, NState};
true -> {ok, NState}
end;
{error, Reason} ->
handle_info({sock_error, Reason}, State)
end;
handle_info({event, sockstate_changed}, State = #state{channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
ClientId =/= undefined andalso emqx_cm:set_chan_attrs(ClientId, attrs(State));
%%TODO: this is not right
handle_info({sock_error, _Reason}, #state{sockstate = closed}) ->
ok;
handle_info({sock_error, Reason}, State) ->
?LOG(debug, "Socket error: ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(sock_closed, #state{sockstate = closed}) -> ok;
handle_info(sock_closed, State) ->
?LOG(debug, "Socket closed"),
handle_info({sock_closed, closed}, close_socket(State));
handle_info({close, Reason}, State) ->
?LOG(debug, "Force close due to : ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_info(Info, State = #state{channel = Channel}) ->
handle_chan_return(emqx_channel:handle_info(Info, Channel), State).
Ret = emqx_channel:handle_info(Info, Channel),
handle_chan_return(Ret, State).
%%--------------------------------------------------------------------
%% Ensure rate limit
@ -655,8 +646,7 @@ activate_socket(State = #state{transport = Transport,
%%--------------------------------------------------------------------
%% Close Socket
close_socket(State = #state{sockstate = closed}) ->
State;
close_socket(State = #state{sockstate = closed}) -> State;
close_socket(State = #state{transport = Transport, socket = Socket}) ->
ok = Transport:fast_close(Socket),
State#state{sockstate = closed}.

View File

@ -127,7 +127,7 @@ run_fold(HookPoint, Args, Acc) ->
do_run_fold(lookup(HookPoint), Args, Acc).
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filter_passed(Filter, Args) andalso execute(Action, Args) of
case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
%% stop the hook chain and return
stop -> ok;
%% continue the hook chain, in following cases:
@ -140,7 +140,7 @@ do_run([], _Args) ->
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc],
case filter_passed(Filter, Args1) andalso execute(Action, Args1) of
case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
%% stop the hook chain
stop -> Acc;
%% stop the hook chain with NewAcc
@ -160,6 +160,15 @@ filter_passed(undefined, _Args) -> true;
filter_passed(Filter, Args) ->
execute(Filter, Args).
safe_execute(Fun, Args) ->
try execute(Fun, Args) of
Result -> Result
catch
_:Reason:Stacktrace ->
?LOG(error, "Failed to execute ~p(~p): ~p", [Fun, Args, {Reason, Stacktrace}]),
ok
end.
%% @doc execute a function.
execute(Fun, Args) when is_function(Fun) ->
erlang:apply(Fun, Args);

View File

@ -29,6 +29,7 @@
, delete/2
, values/1
, to_list/1
, to_list/2
, size/1
, max_size/1
, is_full/1
@ -105,6 +106,10 @@ values(?Inflight(Tree)) ->
to_list(?Inflight(Tree)) ->
gb_trees:to_list(Tree).
-spec(to_list(fun(), inflight()) -> list({key(), term()})).
to_list(SortFun, ?Inflight(Tree)) ->
lists:sort(SortFun, gb_trees:to_list(Tree)).
-spec(window(inflight()) -> list()).
window(Inflight = ?Inflight(Tree)) ->
case gb_trees:is_empty(Tree) of

View File

@ -101,6 +101,7 @@
{counter, 'packets.pubrel.missed'}, % PUBREL packets missed
{counter, 'packets.pubcomp.received'}, % PUBCOMP packets received
{counter, 'packets.pubcomp.sent'}, % PUBCOMP packets sent
{counter, 'packets.pubcomp.inuse'}, % PUBCOMP packet_id inuse
{counter, 'packets.pubcomp.missed'}, % PUBCOMP packets missed
{counter, 'packets.subscribe.received'}, % SUBSCRIBE Packets received
{counter, 'packets.subscribe.error'}, % SUBSCRIBE error
@ -363,7 +364,7 @@ init([]) ->
% Store reserved indices
lists:foreach(fun({Type, Name}) ->
Idx = reserved_idx(Name),
Metric = #metric{name = Name, type = Type, idx = reserved_idx(Name)},
Metric = #metric{name = Name, type = Type, idx = Idx},
true = ets:insert(?TAB, Metric),
ok = counters:put(CRef, Idx, 0)
end,?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS ++ ?CHAN_METRICS ++ ?MQTT_METRICS),
@ -460,5 +461,6 @@ reserved_idx('messages.forward') -> 51;
reserved_idx('auth.mqtt.anonymous') -> 52;
reserved_idx('channel.gc.cnt') -> 53;
reserved_idx('packets.pubrec.inuse') -> 54;
reserved_idx('packets.pubcomp.inuse') -> 55;
reserved_idx(_) -> undefined.

View File

@ -29,6 +29,7 @@
, start_timer/3
, cancel_timer/1
, drain_deliver/0
, drain_deliver/1
, drain_down/1
, check_oom/1
, check_oom/2
@ -112,14 +113,19 @@ cancel_timer(Timer) when is_reference(Timer) ->
end;
cancel_timer(_) -> ok.
%% @doc Drain delivers from the channel proc's mailbox.
%% @doc Drain delivers
drain_deliver() ->
drain_deliver([]).
drain_deliver(-1).
drain_deliver(Acc) ->
drain_deliver(N) when is_integer(N) ->
drain_deliver(N, []).
drain_deliver(0, Acc) ->
lists:reverse(Acc);
drain_deliver(N, Acc) ->
receive
Deliver = {deliver, _Topic, _Msg} ->
drain_deliver([Deliver|Acc])
drain_deliver(N-1, [Deliver|Acc])
after 0 ->
lists:reverse(Acc)
end.

View File

@ -59,7 +59,7 @@
-define(DEFAULT_CAPS, #{max_packet_size => ?MAX_PACKET_SIZE,
max_clientid_len => ?MAX_CLIENTID_LEN,
max_topic_alias => ?UNLIMITED,
max_topic_alias => ?MAX_TOPIC_AlIAS,
max_topic_levels => ?UNLIMITED,
max_qos_allowed => ?QOS_2,
retain_available => true,

View File

@ -37,6 +37,7 @@
]).
-export([ to_message/2
, to_message/3
, will_msg/1
]).
@ -110,7 +111,10 @@ check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscr
check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
check(UnsubPkt);
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _TopicAlias}}) ->
%% A Topic Alias of 0 is not permitted.
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= 0}}) ->
{error, ?RC_PROTOCOL_ERROR};
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias':= _Alias}}) ->
ok;
check(#mqtt_packet_publish{topic_name = <<>>, properties = #{}}) ->
{error, ?RC_PROTOCOL_ERROR};
@ -174,9 +178,9 @@ check(ConnPkt, Opts) when is_record(ConnPkt, mqtt_packet_connect) ->
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _Opts) ->
case lists:member({Ver, Name}, ?PROTOCOL_NAMES) of
true -> ok;
false -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
case proplists:get_value(Ver, ?PROTOCOL_NAMES) of
Name -> ok;
_Other -> {error, ?RC_UNSUPPORTED_PROTOCOL_VERSION}
end.
%% MQTT3.1 does not allow null clientId
@ -191,7 +195,7 @@ check_client_id(#mqtt_packet_connect{clientid = <<>>,
clean_start = true}, _Opts) ->
ok;
check_client_id(#mqtt_packet_connect{clientid = ClientId},
_Opts = #{max_clientid_len := MaxLen}) ->
#{max_clientid_len := MaxLen} = _Opts) ->
case (1 =< (Len = byte_size(ClientId))) andalso (Len =< MaxLen) of
true -> ok;
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
@ -226,7 +230,7 @@ run_checks([], _Packet, _Options) ->
run_checks([Check|More], Packet, Options) ->
case Check(Packet, Options) of
ok -> run_checks(More, Packet, Options);
{error, Reason} -> {error, Reason}
Error = {error, _Reason} -> Error
end.
%% @doc Validate MQTT Packet
@ -239,20 +243,34 @@ validate_topic_filters(TopicFilters) ->
emqx_topic:validate(TopicFilter)
end, TopicFilters).
%% @doc Publish Packet to Message.
-spec(to_message(emqx_types:clientinfo(), emqx_ypes:packet()) -> emqx_types:message()).
to_message(#{clientid := ClientId, username := Username, peerhost := PeerHost},
to_message(ClientInfo, Packet) ->
to_message(ClientInfo, #{}, Packet).
%% @doc Transform Publish Packet to Message.
-spec(to_message(emqx_types:clientinfo(), map(), emqx_ypes:packet())
-> emqx_types:message()).
to_message(#{protocol := Protocol,
clientid := ClientId,
username := Username,
peerhost := PeerHost
}, Headers,
#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
retain = Retain,
qos = QoS,
dup = Dup},
dup = Dup
},
variable = #mqtt_packet_publish{topic_name = Topic,
properties = Props},
payload = Payload}) ->
properties = Props
},
payload = Payload
}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => Dup, retain => Retain},
headers = merge_props(#{username => Username,
peerhost => PeerHost}, Props)}.
Headers1 = merge_props(Headers#{protocol => Protocol,
username => Username,
peerhost => PeerHost
}, Props),
Msg#message{flags = #{dup => Dup, retain => Retain}, headers = Headers1}.
-spec(will_msg(#mqtt_packet_connect{}) -> emqx_types:message()).
will_msg(#mqtt_packet_connect{will_flag = false}) ->
@ -262,11 +280,11 @@ will_msg(#mqtt_packet_connect{clientid = ClientId,
will_retain = Retain,
will_qos = QoS,
will_topic = Topic,
will_props = Properties,
will_props = Props,
will_payload = Payload}) ->
Msg = emqx_message:make(ClientId, QoS, Topic, Payload),
Msg#message{flags = #{dup => false, retain => Retain},
headers = merge_props(#{username => Username}, Properties)}.
Headers = merge_props(#{username => Username}, Props),
Msg#message{flags = #{dup => false, retain => Retain}, headers = Headers}.
merge_props(Headers, undefined) ->
Headers;

View File

@ -23,9 +23,10 @@
, name/2
, text/1
, text/2
]).
-export([ frame_error/1
, connack_error/1
, mqtt_frame_error/1
, formalized/2
]).
-export([compat/2]).
@ -165,6 +166,9 @@ compat(suback, Code) when Code >= 16#80 -> 16#80;
compat(unsuback, _Code) -> undefined;
compat(_Other, _Code) -> undefined.
frame_error(frame_too_large) -> ?RC_PACKET_TOO_LARGE;
frame_error(_) -> ?RC_MALFORMED_PACKET.
connack_error(client_identifier_not_valid) -> ?RC_CLIENT_IDENTIFIER_NOT_VALID;
connack_error(bad_username_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
connack_error(bad_clientid_or_password) -> ?RC_BAD_USER_NAME_OR_PASSWORD;
@ -175,11 +179,6 @@ connack_error(server_unavailable) -> ?RC_SERVER_UNAVAILABLE;
connack_error(server_busy) -> ?RC_SERVER_BUSY;
connack_error(banned) -> ?RC_BANNED;
connack_error(bad_authentication_method) -> ?RC_BAD_AUTHENTICATION_METHOD;
%% TODO: ???
connack_error(_) -> ?RC_NOT_AUTHORIZED.
mqtt_frame_error(mqtt_frame_too_large) -> ?RC_PACKET_TOO_LARGE;
mqtt_frame_error(_) -> ?RC_MALFORMED_PACKET.
formalized(connack, Code) when is_integer(Code) -> Code;
formalized(connack, _Code) ->
?RC_SERVER_UNAVAILABLE.

View File

@ -59,7 +59,6 @@
-export([ info/1
, info/2
, attrs/1
, stats/1
]).
@ -86,7 +85,7 @@
-export([expire/2]).
%% export for ct
%% Export for CT
-export([set_field/3]).
-export_type([session/0]).
@ -100,7 +99,8 @@
max_subscriptions :: non_neg_integer(),
%% Upgrade QoS?
upgrade_qos :: boolean(),
%% Client <- Broker: QoS1/2 messages sent to the client but unacked.
%% Client <- Broker: QoS1/2 messages sent to the client but
%% have not been unacked.
inflight :: emqx_inflight:inflight(),
%% All QoS1/2 messages published to when client is disconnected,
%% or QoS1/2 messages pending transmission to the Client.
@ -109,49 +109,31 @@
mqueue :: emqx_mqueue:mqueue(),
%% Next packet id of the session
next_pkt_id = 1 :: emqx_types:packet_id(),
%% Retry interval for redelivering QoS1/2 messages
%% Retry interval for redelivering QoS1/2 messages (Unit: millsecond)
retry_interval :: timeout(),
%% Client -> Broker: QoS2 messages received from client and waiting for pubrel.
%% Client -> Broker: QoS2 messages received from the client, but
%% have not been completely acknowledged
awaiting_rel :: map(),
%% Max Packets Awaiting PUBREL
%% Maximum number of awaiting QoS2 messages allowed
max_awaiting_rel :: non_neg_integer(),
%% Awaiting PUBREL Timeout
awaiting_rel_timeout :: timeout(),
%% Deliver Stats
deliver_stats :: emqx_types:stats(),
%% Awaiting PUBREL Timeout (Unit: millsecond)
await_rel_timeout :: timeout(),
%% Created at
created_at :: pos_integer()
}).
-opaque(session() :: #session{}).
-type(publish() :: {publish, emqx_types:packet_id(), emqx_types:message()}).
-type(publish() :: {maybe(emqx_types:packet_id()), emqx_types:message()}).
-define(DEFAULT_BATCH_N, 1000).
-type(pubrel() :: {pubrel, emqx_types:packet_id()}).
-define(ATTR_KEYS, [inflight_cnt,
inflight_max,
mqueue_len,
mqueue_max,
retry_interval,
awaiting_rel_max,
awaiting_rel_timeout,
created_at
]).
-type(replies() :: list(publish() | pubrel())).
-define(INFO_KEYS, [subscriptions,
subscriptions_max,
upgrade_qos,
inflight,
inflight_max,
retry_interval,
mqueue_len,
mqueue_max,
mqueue_dropped,
next_pkt_id,
awaiting_rel,
awaiting_rel_max,
awaiting_rel_timeout,
await_rel_timeout,
created_at
]).
@ -162,16 +144,17 @@
mqueue_len,
mqueue_max,
mqueue_dropped,
awaiting_rel,
awaiting_rel_max,
enqueue_cnt
next_pkt_id,
awaiting_rel_cnt,
awaiting_rel_max
]).
-define(DEFAULT_BATCH_N, 1000).
%%--------------------------------------------------------------------
%% Init a Session
%%--------------------------------------------------------------------
%% @doc Init a session.
-spec(init(emqx_types:clientinfo(), emqx_types:conninfo()) -> session()).
init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
#session{max_subscriptions = get_env(Zone, max_subscriptions, 0),
@ -180,10 +163,10 @@ init(#{zone := Zone}, #{receive_maximum := MaxInflight}) ->
inflight = emqx_inflight:new(MaxInflight),
mqueue = init_mqueue(Zone),
next_pkt_id = 1,
retry_interval = get_env(Zone, retry_interval, 0),
retry_interval = timer:seconds(get_env(Zone, retry_interval, 0)),
awaiting_rel = #{},
max_awaiting_rel = get_env(Zone, max_awaiting_rel, 100),
awaiting_rel_timeout = get_env(Zone, awaiting_rel_timeout, 3600*1000),
await_rel_timeout = timer:seconds(get_env(Zone, await_rel_timeout, 300)),
created_at = erlang:system_time(second)
}.
@ -195,20 +178,15 @@ init_mqueue(Zone) ->
default_priority => get_env(Zone, mqueue_default_priority, lowest)
}).
%%--------------------------------------------------------------------
%% Info, Stats
%%--------------------------------------------------------------------
%% @doc Get infos of the session.
-spec(info(session()) -> emqx_types:infos()).
info(Session) ->
maps:from_list(info(?INFO_KEYS, Session)).
%% Get attrs of the session.
-spec(attrs(session()) -> emqx_types:attrs()).
attrs(Session) ->
maps:from_list(info(?ATTR_KEYS, Session)).
%% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()).
stats(Session) -> info(?STATS_KEYS, Session).
info(Keys, Session) when is_list(Keys) ->
[{Key, info(Key, Session)} || Key <- Keys];
info(subscriptions, #session{subscriptions = Subs}) ->
@ -226,7 +204,9 @@ info(inflight_cnt, #session{inflight = Inflight}) ->
info(inflight_max, #session{inflight = Inflight}) ->
emqx_inflight:max_size(Inflight);
info(retry_interval, #session{retry_interval = Interval}) ->
Interval;
Interval div 1000;
info(mqueue, #session{mqueue = MQueue}) ->
MQueue;
info(mqueue_len, #session{mqueue = MQueue}) ->
emqx_mqueue:len(MQueue);
info(mqueue_max, #session{mqueue = MQueue}) ->
@ -239,88 +219,43 @@ info(awaiting_rel, #session{awaiting_rel = AwaitingRel}) ->
AwaitingRel;
info(awaiting_rel_cnt, #session{awaiting_rel = AwaitingRel}) ->
maps:size(AwaitingRel);
info(awaiting_rel_max, #session{max_awaiting_rel = MaxAwaitingRel}) ->
MaxAwaitingRel;
info(awaiting_rel_timeout, #session{awaiting_rel_timeout = Timeout}) ->
Timeout;
info(enqueue_cnt, #session{deliver_stats = undefined}) ->
0;
info(enqueue_cnt, #session{deliver_stats = Stats}) ->
maps:get(enqueue_cnt, Stats, 0);
info(deliver_stats, #session{deliver_stats = Stats}) ->
Stats;
info(awaiting_rel_max, #session{max_awaiting_rel = Max}) ->
Max;
info(await_rel_timeout, #session{await_rel_timeout = Timeout}) ->
Timeout div 1000;
info(created_at, #session{created_at = CreatedAt}) ->
CreatedAt.
%% For tests
set_field(Name, Val, Channel) ->
Fields = record_info(fields, session),
Pos = emqx_misc:index_of(Name, Fields),
setelement(Pos+1, Channel, Val).
-spec(takeover(session()) -> ok).
takeover(#session{subscriptions = Subs}) ->
lists:foreach(fun({TopicFilter, _SubOpts}) ->
ok = emqx_broker:unsubscribe(TopicFilter)
end, maps:to_list(Subs)).
-spec(resume(emqx_types:clientid(), session()) -> ok).
resume(ClientId, #session{subscriptions = Subs}) ->
%% 1. Subscribe again.
lists:foreach(fun({TopicFilter, SubOpts}) ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
end, maps:to_list(Subs)).
%% 2. Run hooks.
%% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]),
%% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
%%Session.
redeliver(Session = #session{inflight = Inflight}) ->
Publishes = lists:map(fun({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId, ?RC_SUCCESS};
({PacketId, {Msg, _Ts}}) ->
{publish, PacketId, Msg}
end, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of
{ok, NSession} ->
{ok, Publishes, NSession};
{ok, More, NSession} ->
{ok, lists:append(Publishes, More), NSession}
end.
%% @doc Get stats of the session.
-spec(stats(session()) -> emqx_types:stats()).
stats(Session) -> info(?STATS_KEYS, Session).
%%--------------------------------------------------------------------
%% Client -> Broker: SUBSCRIBE
%%--------------------------------------------------------------------
-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(), emqx_types:subopts(), session())
-spec(subscribe(emqx_types:clientinfo(), emqx_types:topic(),
emqx_types:subopts(), session())
-> {ok, session()} | {error, emqx_types:reason_code()}).
subscribe(ClientInfo, TopicFilter, SubOpts, Session = #session{subscriptions = Subs}) ->
case is_subscriptions_full(Session)
andalso (not maps:is_key(TopicFilter, Subs)) of
true -> {error, ?RC_QUOTA_EXCEEDED};
subscribe(ClientInfo = #{clientid := ClientId}, TopicFilter, SubOpts,
Session = #session{subscriptions = Subs}) ->
IsNew = not maps:is_key(TopicFilter, Subs),
case IsNew andalso is_subscriptions_full(Session) of
false ->
do_subscribe(ClientInfo, TopicFilter, SubOpts, Session)
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts),
ok = emqx_hooks:run('session.subscribed',
[ClientInfo, TopicFilter, SubOpts#{is_new => IsNew}]),
{ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}};
true -> {error, ?RC_QUOTA_EXCEEDED}
end.
-compile({inline, [is_subscriptions_full/1]}).
is_subscriptions_full(#session{max_subscriptions = 0}) ->
false;
is_subscriptions_full(#session{max_subscriptions = MaxLimit,
subscriptions = Subs}) ->
is_subscriptions_full(#session{subscriptions = Subs,
max_subscriptions = MaxLimit}) ->
maps:size(Subs) >= MaxLimit.
-compile({inline, [do_subscribe/4]}).
do_subscribe(Client = #{clientid := ClientId}, TopicFilter, SubOpts,
Session = #session{subscriptions = Subs}) ->
case IsNew = (not maps:is_key(TopicFilter, Subs)) of
true ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts);
false ->
_ = emqx_broker:set_subopts(TopicFilter, SubOpts)
end,
ok = emqx_hooks:run('session.subscribed',
[Client, TopicFilter, SubOpts#{is_new => IsNew}]),
{ok, Session#session{subscriptions = maps:put(TopicFilter, SubOpts, Subs)}}.
%%--------------------------------------------------------------------
%% Client -> Broker: UNSUBSCRIBE
%%--------------------------------------------------------------------
@ -342,46 +277,41 @@ unsubscribe(ClientInfo, TopicFilter, Session = #session{subscriptions = Subs}) -
%%--------------------------------------------------------------------
-spec(publish(emqx_types:packet_id(), emqx_types:message(), session())
-> {ok, emqx_types:publish_result(), session()} |
{error, emqx_types:reason_code()}).
publish(PacketId, Msg = #message{qos = ?QOS_2}, Session) ->
-> {ok, emqx_types:publish_result(), session()}
| {error, emqx_types:reason_code()}).
publish(PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts},
Session = #session{awaiting_rel = AwaitingRel}) ->
case is_awaiting_full(Session) of
false ->
do_publish(PacketId, Msg, Session);
case maps:is_key(PacketId, AwaitingRel) of
false ->
Results = emqx_broker:publish(Msg),
AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
{ok, Results, Session#session{awaiting_rel = AwaitingRel1}};
true ->
{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end;
true -> {error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}
end;
%% Publish QoS0/1 directly
publish(_PacketId, Msg, Session) ->
{ok, emqx_broker:publish(Msg), Session}.
-compile({inline, [is_awaiting_full/1]}).
is_awaiting_full(#session{max_awaiting_rel = 0}) ->
false;
is_awaiting_full(#session{awaiting_rel = AwaitingRel,
max_awaiting_rel = MaxLimit}) ->
maps:size(AwaitingRel) >= MaxLimit.
-compile({inline, [do_publish/3]}).
do_publish(PacketId, Msg = #message{timestamp = Ts},
Session = #session{awaiting_rel = AwaitingRel}) ->
case maps:is_key(PacketId, AwaitingRel) of
false ->
DeliverResults = emqx_broker:publish(Msg),
AwaitingRel1 = maps:put(PacketId, Ts, AwaitingRel),
Session1 = Session#session{awaiting_rel = AwaitingRel1},
{ok, DeliverResults, Session1};
true ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end.
%%--------------------------------------------------------------------
%% Client -> Broker: PUBACK
%%--------------------------------------------------------------------
-spec(puback(emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()}
| {ok, emqx_types:message(), list(publish()), session()}
| {ok, emqx_types:message(), replies(), session()}
| {error, emqx_types:reason_code()}).
puback(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
@ -394,6 +324,7 @@ puback(PacketId, Session = #session{inflight = Inflight}) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end.
-compile({inline, [return_with/2]}).
return_with(Msg, {ok, Session}) ->
{ok, Msg, Session};
return_with(Msg, {ok, Publishes, Session}) ->
@ -404,11 +335,13 @@ return_with(Msg, {ok, Publishes, Session}) ->
%%--------------------------------------------------------------------
-spec(pubrec(emqx_types:packet_id(), session())
-> {ok, emqx_types:message(), session()} | {error, emqx_types:reason_code()}).
-> {ok, emqx_types:message(), session()}
| {error, emqx_types:reason_code()}).
pubrec(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {Msg, _Ts}} when is_record(Msg, message) ->
Inflight1 = emqx_inflight:update(PacketId, {pubrel, os:timestamp()}, Inflight),
Inflight1 = emqx_inflight:update(
PacketId, {pubrel, os:timestamp()}, Inflight),
{ok, Msg, Session#session{inflight = Inflight1}};
{value, {pubrel, _Ts}} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
@ -435,14 +368,16 @@ pubrel(PacketId, Session = #session{awaiting_rel = AwaitingRel}) ->
%%--------------------------------------------------------------------
-spec(pubcomp(emqx_types:packet_id(), session())
-> {ok, session()} | {ok, list(publish()), session()}
-> {ok, session()} | {ok, replies(), session()}
| {error, emqx_types:reason_code()}).
pubcomp(PacketId, Session = #session{inflight = Inflight}) ->
case emqx_inflight:contain(PacketId, Inflight) of
true ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {pubrel, _Ts}} ->
Inflight1 = emqx_inflight:delete(PacketId, Inflight),
dequeue(Session#session{inflight = Inflight1});
false ->
{value, _Other} ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE};
none ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end.
@ -455,81 +390,90 @@ dequeue(Session = #session{inflight = Inflight, mqueue = Q}) ->
true -> {ok, Session};
false ->
{Msgs, Q1} = dequeue(batch_n(Inflight), [], Q),
deliver(lists:reverse(Msgs), [], Session#session{mqueue = Q1})
deliver(Msgs, [], Session#session{mqueue = Q1})
end.
dequeue(Cnt, Msgs, Q) when Cnt =< 0 ->
{Msgs, Q};
dequeue(0, Msgs, Q) ->
{lists:reverse(Msgs), Q};
dequeue(Cnt, Msgs, Q) ->
case emqx_mqueue:out(Q) of
{empty, _Q} -> {Msgs, Q};
{empty, _Q} -> dequeue(0, Msgs, Q);
{{value, Msg = #message{qos = ?QOS_0}}, Q1} ->
dequeue(Cnt, acc_msg(Msg, Msgs), Q1);
{{value, Msg}, Q1} ->
dequeue(Cnt-1, acc_msg(Msg, Msgs), Q1)
end.
-compile({inline, [acc_msg/2]}).
acc_msg(Msg, Msgs) ->
case emqx_message:is_expired(Msg) of
true ->
ok = emqx_metrics:inc('messages.expired'),
dequeue(Cnt-1, Msgs, Q1);
false ->
dequeue(Cnt-1, [Msg|Msgs], Q1)
end
end.
batch_n(Inflight) ->
case emqx_inflight:max_size(Inflight) of
0 -> ?DEFAULT_BATCH_N;
Sz -> Sz - emqx_inflight:size(Inflight)
true -> Msgs;
false -> [Msg|Msgs]
end.
%%--------------------------------------------------------------------
%% Broker -> Client: Publish | Msg
%% Broker -> Client: Deliver
%%--------------------------------------------------------------------
deliver(Delivers, Session = #session{subscriptions = Subs})
when is_list(Delivers) ->
Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session)
|| {deliver, Topic, Msg} <- Delivers],
-spec(deliver(list(emqx_types:deliver()), session())
-> {ok, session()} | {ok, replies(), session()}).
deliver(Delivers, Session) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
deliver(Msgs, [], Session).
deliver([], Publishes, Session) ->
{ok, lists:reverse(Publishes), Session};
deliver([Msg = #message{qos = ?QOS_0}|More], Acc, Session) ->
deliver(More, [{publish, undefined, Msg}|Acc], Session);
Publish = {undefined, maybe_ack(Msg)},
deliver(More, [Publish|Acc], Session);
deliver([Msg = #message{qos = QoS}|More], Acc,
Session = #session{next_pkt_id = PacketId, inflight = Inflight})
deliver([Msg = #message{qos = QoS}|More], Acc, Session =
#session{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true ->
deliver(More, Acc, enqueue(Msg, Session));
Session1 = case maybe_nack(Msg) of
true -> Session;
false -> enqueue(Msg, Session)
end,
deliver(More, Acc, Session1);
false ->
Publish = {publish, PacketId, Msg},
Publish = {PacketId, maybe_ack(Msg)},
Session1 = await(PacketId, Msg, Session),
deliver(More, [Publish|Acc], next_pkt_id(Session1))
end.
enqueue(Delivers, Session = #session{subscriptions = Subs}) when is_list(Delivers) ->
Msgs = [enrich_subopt(get_subopts(Topic, Subs), Msg, Session)
|| {deliver, Topic, Msg} <- Delivers],
-spec(enqueue(list(emqx_types:deliver())|emqx_types:message(), session())
-> session()).
enqueue(Delivers, Session) when is_list(Delivers) ->
Msgs = lists:map(enrich_fun(Session), Delivers),
lists:foldl(fun enqueue/2, Session, Msgs);
enqueue(Msg, Session = #session{mqueue = Q})
when is_record(Msg, message) ->
enqueue(Msg, Session = #session{mqueue = Q}) when is_record(Msg, message) ->
{Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
if is_record(Dropped, message) ->
?LOG(warning, "Dropped msg due to mqueue is full: ~s",
[emqx_message:format(Dropped)]);
true -> ok
end,
inc_deliver_stats(enqueue_cnt, Session#session{mqueue = NewQ}).
Session#session{mqueue = NewQ}.
%%--------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
enrich_fun(Session = #session{subscriptions = Subs}) ->
fun({deliver, Topic, Msg}) ->
enrich_subopts(get_subopts(Topic, Subs), Msg, Session)
end.
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight),
Session#session{inflight = Inflight1}.
maybe_ack(Msg) ->
case emqx_shared_sub:is_ack_required(Msg) of
true -> emqx_shared_sub:maybe_ack(Msg);
false -> Msg
end.
maybe_nack(Msg) ->
emqx_shared_sub:is_ack_required(Msg)
andalso (ok == emqx_shared_sub:maybe_nack_dropped(Msg)).
get_subopts(Topic, SubMap) ->
case maps:find(Topic, SubMap) of
@ -540,67 +484,69 @@ get_subopts(Topic, SubMap) ->
error -> []
end.
enrich_subopt([], Msg, _Session) -> Msg;
enrich_subopt([{nl, 1}|Opts], Msg, Session) ->
enrich_subopt(Opts, emqx_message:set_flag(nl, Msg), Session);
enrich_subopt([{nl, 0}|Opts], Msg, Session) ->
enrich_subopt(Opts, Msg, Session);
enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
enrich_subopts([], Msg, _Session) -> Msg;
enrich_subopts([{nl, 1}|Opts], Msg, Session) ->
enrich_subopts(Opts, emqx_message:set_flag(nl, Msg), Session);
enrich_subopts([{nl, 0}|Opts], Msg, Session) ->
enrich_subopts(Opts, Msg, Session);
enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
Session = #session{upgrade_qos = true}) ->
enrich_subopt(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
enrich_subopt([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
enrich_subopts(Opts, Msg#message{qos = max(SubQoS, PubQoS)}, Session);
enrich_subopts([{qos, SubQoS}|Opts], Msg = #message{qos = PubQoS},
Session = #session{upgrade_qos = false}) ->
enrich_subopt(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
enrich_subopt([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopt(Opts, emqx_message:set_flag(retain, true, Msg), Session);
enrich_subopt([{rap, 0}|Opts], Msg, Session) ->
enrich_subopt(Opts, emqx_message:set_flag(retain, false, Msg), Session);
enrich_subopt([{rap, 1}|Opts], Msg, Session) ->
enrich_subopt(Opts, Msg, Session);
enrich_subopt([{subid, SubId}|Opts], Msg, Session) ->
enrich_subopts(Opts, Msg#message{qos = min(SubQoS, PubQoS)}, Session);
enrich_subopts([{rap, _}|Opts], Msg = #message{headers = #{retained := true}}, Session) ->
enrich_subopts(Opts, emqx_message:set_flag(retain, true, Msg), Session);
enrich_subopts([{rap, 0}|Opts], Msg, Session) ->
enrich_subopts(Opts, emqx_message:set_flag(retain, false, Msg), Session);
enrich_subopts([{rap, 1}|Opts], Msg, Session) ->
enrich_subopts(Opts, Msg, Session);
enrich_subopts([{subid, SubId}|Opts], Msg, Session) ->
Msg1 = emqx_message:set_header('Subscription-Identifier', SubId, Msg),
enrich_subopt(Opts, Msg1, Session).
enrich_subopts(Opts, Msg1, Session).
%%--------------------------------------------------------------------
%% Awaiting ACK for QoS1/QoS2 Messages
%%--------------------------------------------------------------------
await(PacketId, Msg, Session = #session{inflight = Inflight}) ->
Inflight1 = emqx_inflight:insert(PacketId, {Msg, os:timestamp()}, Inflight),
Session#session{inflight = Inflight1}.
%%--------------------------------------------------------------------
%% Retry Delivery
%%--------------------------------------------------------------------
%% Redeliver at once if force is true
-spec(retry(session()) -> {ok, session()} | {ok, replies(), timeout(), session()}).
retry(Session = #session{inflight = Inflight}) ->
case emqx_inflight:is_empty(Inflight) of
true -> {ok, Session};
false ->
SortFun = fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end,
Msgs = lists:sort(SortFun, emqx_inflight:to_list(Inflight)),
retry_delivery(Msgs, os:timestamp(), [], Session)
retry_delivery(emqx_inflight:to_list(sort_fun(), Inflight),
[], os:timestamp(), Session)
end.
retry_delivery([], _Now, Acc, Session) ->
%% Retry again...
{ok, lists:reverse(Acc), Session};
retry_delivery([], Acc, _Now, Session = #session{retry_interval = Interval}) ->
{ok, lists:reverse(Acc), Interval, Session};
retry_delivery([{PacketId, {Val, Ts}}|More], Now, Acc,
Session = #session{retry_interval = Interval,
inflight = Inflight}) ->
%% Microseconds -> MilliSeconds
Age = timer:now_diff(Now, Ts) div 1000,
if
Age >= Interval ->
{Acc1, Inflight1} = retry_delivery(PacketId, Val, Now, Acc, Inflight),
retry_delivery(More, Now, Acc1, Session#session{inflight = Inflight1});
retry_delivery([{PacketId, {Msg, Ts}}|More], Acc, Now, Session =
#session{retry_interval = Interval, inflight = Inflight}) ->
case (Age = age(Now, Ts)) >= Interval of
true ->
{Acc1, Inflight1} = retry_delivery(PacketId, Msg, Now, Acc, Inflight),
retry_delivery(More, Acc1, Now, Session#session{inflight = Inflight1});
false ->
{ok, lists:reverse(Acc), Interval - max(0, Age), Session}
end.
retry_delivery(PacketId, Msg, Now, Acc, Inflight) when is_record(Msg, message) ->
case emqx_message:is_expired(Msg) of
true ->
ok = emqx_metrics:inc('messages.expired'),
{Acc, emqx_inflight:delete(PacketId, Inflight)};
false ->
Msg1 = emqx_message:set_flag(dup, true, Msg),
{[{publish, PacketId, Msg1}|Acc],
emqx_inflight:update(PacketId, {Msg1, Now}, Inflight)}
Inflight1 = emqx_inflight:update(PacketId, {Msg1, Now}, Inflight),
{[{PacketId, Msg1}|Acc], Inflight1}
end;
retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
@ -611,30 +557,58 @@ retry_delivery(PacketId, pubrel, Now, Acc, Inflight) ->
%% Expire Awaiting Rel
%%--------------------------------------------------------------------
-spec(expire(awaiting_rel, session()) -> {ok, session()} | {ok, timeout(), session()}).
expire(awaiting_rel, Session = #session{awaiting_rel = AwaitingRel}) ->
case maps:size(AwaitingRel) of
0 -> {ok, Session};
_ ->
AwaitingRel1 = lists:keysort(2, maps:to_list(AwaitingRel)),
expire_awaiting_rel(AwaitingRel1, os:timestamp(), Session)
_ -> expire_awaiting_rel(os:timestamp(), Session)
end.
expire_awaiting_rel([], _Now, Session) ->
{ok, Session};
expire_awaiting_rel(Now, Session = #session{awaiting_rel = AwaitingRel,
await_rel_timeout = Timeout}) ->
expire_awaiting_rel([{PacketId, Ts} | More], Now,
Session = #session{awaiting_rel = AwaitingRel,
awaiting_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
ok = emqx_metrics:inc('messages.qos2.expired'),
?LOG(warning, "Dropped qos2 packet ~s for awaiting_rel_timeout", [PacketId]),
Session1 = Session#session{awaiting_rel = maps:remove(PacketId, AwaitingRel)},
expire_awaiting_rel(More, Now, Session1);
Age ->
{ok, Timeout - max(0, Age), Session}
NotExpired = fun(_PacketId, Ts) -> age(Now, Ts) < Timeout end,
case maps:filter(NotExpired, AwaitingRel) of
[] -> {ok, Session};
AwaitingRel1 ->
{ok, Timeout, Session#session{awaiting_rel = AwaitingRel1}}
end.
%%--------------------------------------------------------------------
%% Takeover, Resume and Redeliver
%%--------------------------------------------------------------------
-spec(takeover(session()) -> ok).
takeover(#session{subscriptions = Subs}) ->
lists:foreach(fun({TopicFilter, _SubOpts}) ->
ok = emqx_broker:unsubscribe(TopicFilter)
end, maps:to_list(Subs)).
-spec(resume(emqx_types:clientid(), session()) -> ok).
resume(ClientId, #session{subscriptions = Subs}) ->
%% 1. Subscribe again.
lists:foreach(fun({TopicFilter, SubOpts}) ->
ok = emqx_broker:subscribe(TopicFilter, ClientId, SubOpts)
end, maps:to_list(Subs)).
%% 2. Run hooks.
%% ok = emqx_hooks:run('session.resumed', [#{clientid => ClientId}, attrs(Session)]),
%% TODO: 3. Redeliver: Replay delivery and Dequeue pending messages
%%Session.
-spec(redeliver(session()) -> {ok, replies(), session()}).
redeliver(Session = #session{inflight = Inflight}) ->
Pubs = lists:map(fun to_pub/1, emqx_inflight:to_list(Inflight)),
case dequeue(Session) of
{ok, NSession} -> {ok, Pubs, NSession};
{ok, More, NSession} ->
{ok, lists:append(Pubs, More), NSession}
end.
to_pub({PacketId, {pubrel, _Ts}}) ->
{pubrel, PacketId};
to_pub({PacketId, {Msg, _Ts}}) ->
{PacketId, emqx_message:set_flag(dup, true, Msg)}.
%%--------------------------------------------------------------------
%% Next Packet Id
%%--------------------------------------------------------------------
@ -649,11 +623,25 @@ next_pkt_id(Session = #session{next_pkt_id = Id}) ->
%% Helper functions
%%--------------------------------------------------------------------
inc_deliver_stats(Key, Session) ->
inc_deliver_stats(Key, 1, Session).
inc_deliver_stats(Key, I, Session = #session{deliver_stats = undefined}) ->
Session#session{deliver_stats = #{Key => I}};
inc_deliver_stats(Key, I, Session = #session{deliver_stats = Stats}) ->
NStats = maps:update_with(Key, fun(V) -> V+I end, I, Stats),
Session#session{deliver_stats = NStats}.
-compile({inline, [sort_fun/0, batch_n/1, age/2]}).
sort_fun() ->
fun({_, {_, Ts1}}, {_, {_, Ts2}}) -> Ts1 < Ts2 end.
batch_n(Inflight) ->
case emqx_inflight:max_size(Inflight) of
0 -> ?DEFAULT_BATCH_N;
Sz -> Sz - emqx_inflight:size(Inflight)
end.
age(Now, Ts) -> timer:now_diff(Now, Ts) div 1000.
%%--------------------------------------------------------------------
%% For CT tests
%%--------------------------------------------------------------------
set_field(Name, Val, Channel) ->
Fields = record_info(fields, session),
Pos = emqx_misc:index_of(Name, Fields),
setelement(Pos+1, Channel, Val).

View File

@ -64,7 +64,8 @@
, message/0
]).
-export_type([ delivery/0
-export_type([ deliver/0
, delivery/0
, publish_result/0
, deliver_result/0
]).
@ -172,6 +173,7 @@
-type(payload() :: binary() | iodata()).
-type(message() :: #message{}).
-type(banned() :: #banned{}).
-type(deliver() :: {deliver, topic(), message()}).
-type(delivery() :: #delivery{}).
-type(deliver_result() :: ok | {error, term()}).
-type(publish_result() :: [ {node(), topic(), deliver_result()}

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
%% MQTT/WS Connection
%% MQTT/WS|WSS Connection
-module(emqx_ws_connection).
-include("emqx.hrl").
@ -86,7 +86,7 @@
-type(state() :: #state{}).
-define(ACTIVE_N, 100).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
@ -99,15 +99,15 @@
-spec(info(pid()|state()) -> emqx_types:infos()).
info(WsPid) when is_pid(WsPid) ->
call(WsPid, info);
info(WsConn = #state{channel = Channel}) ->
info(State = #state{channel = Channel}) ->
ChanInfo = emqx_channel:info(Channel),
SockInfo = maps:from_list(info(?INFO_KEYS, WsConn)),
maps:merge(ChanInfo, #{sockinfo => SockInfo}).
SockInfo = maps:from_list(
info(?INFO_KEYS, State)),
ChanInfo#{sockinfo => SockInfo}.
info(Keys, WsConn) when is_list(Keys) ->
[{Key, info(Key, WsConn)} || Key <- Keys];
info(socktype, _State) ->
ws;
info(Keys, State) when is_list(Keys) ->
[{Key, info(Key, State)} || Key <- Keys];
info(socktype, _State) -> ws;
info(peername, #state{peername = Peername}) ->
Peername;
info(sockname, #state{sockname = Sockname}) ->
@ -123,11 +123,6 @@ info(channel, #state{channel = Channel}) ->
info(stop_reason, #state{stop_reason = Reason}) ->
Reason.
attrs(State = #state{channel = Channel}) ->
ChanAttrs = emqx_channel:attrs(Channel),
SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
-spec(stats(pid()|state()) -> emqx_types:stats()).
stats(WsPid) when is_pid(WsPid) ->
call(WsPid, stats);
@ -286,9 +281,10 @@ websocket_info(rate_limit, State) ->
websocket_info({check_gc, Stats}, State) ->
{ok, check_oom(run_gc(Stats, State))};
websocket_info({deliver, _Topic, _Msg} = Deliver, State = #state{channel = Channel}) ->
Delivers = [Deliver|emqx_misc:drain_deliver()],
Ret = emqx_channel:handle_out(Delivers, Channel),
websocket_info(Deliver = {deliver, _Topic, _Msg},
State = #state{active_n = ActiveN, channel = Channel}) ->
Delivers = [Deliver|emqx_misc:drain_deliver(ActiveN)],
Ret = emqx_channel:handle_deliver(Delivers, Channel),
handle_chan_return(Ret, State);
websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef}) ->
@ -300,8 +296,8 @@ websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef
websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
handle_timeout(TRef, Msg, State);
websocket_info({close, Reason}, State) ->
stop({shutdown, Reason}, State);
websocket_info(Close = {close, _Reason}, State) ->
handle_info(Close, State);
websocket_info({shutdown, Reason}, State) ->
stop({shutdown, Reason}, State);
@ -349,16 +345,26 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
%%--------------------------------------------------------------------
%% Handle Info
handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
ok = emqx_cm:register_channel(ClientId),
ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
handle_info({connack, ConnAck}, State) ->
reply(enqueue(ConnAck, State));
handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
emqx_cm:set_chan_attrs(ClientId, attrs(State)),
handle_info({close, Reason}, State) ->
stop({shutdown, Reason}, State);
handle_info({event, connected}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:register_channel(ClientId, info(State), stats(State)),
reply(State);
handle_info({event, disconnected}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:connection_closed(ClientId),
reply(State);
handle_info({event, _Other}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:set_chan_stats(ClientId, stats(State)),
reply(State);
@ -376,10 +382,10 @@ handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
RecvOct = emqx_pd:get_counter(recv_oct),
handle_timeout(TRef, {keepalive, RecvOct}, State);
handle_timeout(TRef, emit_stats, State = #state{channel = Channel,
stats_timer = TRef}) ->
#{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
(ClientId =/= undefined) andalso emqx_cm:set_chan_stats(ClientId, stats(State)),
handle_timeout(TRef, emit_stats, State =
#state{channel = Channel, stats_timer = TRef}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_stats(ClientId, stats(State)),
reply(State#state{stats_timer = undefined});
handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
@ -561,8 +567,8 @@ reply(Packet, State) when is_record(Packet, mqtt_packet) ->
reply(enqueue(Packet, State));
reply({outgoing, Packets}, State) ->
reply(enqueue(Packets, State));
reply(Close = {close, _Reason}, State) ->
self() ! Close,
reply(Other, State) when is_tuple(Other) ->
self() ! Other,
reply(State);
reply([], State) ->

View File

@ -141,7 +141,7 @@ mqtt_frame_options(Zone) ->
-spec(mqtt_strict_mode(zone()) -> boolean()).
mqtt_strict_mode(Zone) ->
get_env(Zone, mqtt_strict_mode, false).
get_env(Zone, strict_mode, false).
-spec(max_packet_size(zone()) -> integer()).
max_packet_size(Zone) ->

View File

@ -61,7 +61,7 @@ t_subopts(_) ->
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(<<"clientid">>,<<"topic">>)),
emqx_broker:subscribe(<<"topic">>, <<"clientid">>, #{qos => 2}),
?assertEqual(#{qos => 1, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
?assertEqual(true, emqx_broker:set_subopts(<<"topic">>, #{qos => 2})),
?assertEqual(#{qos => 2, subid => <<"clientid">>}, emqx_broker:get_subopts(self(), <<"topic">>)),
emqx_broker:unsubscribe(<<"topic">>).

View File

@ -91,11 +91,10 @@ t_chan_info(_) ->
} = emqx_channel:info(channel()),
?assertEqual(clientinfo(), ClientInfo).
t_chan_attrs(_) ->
#{conn_state := connected} = emqx_channel:attrs(channel()).
t_chan_caps(_) ->
Caps = emqx_channel:caps(channel()).
Caps = emqx_mqtt_caps:default(),
?assertEqual(Caps#{max_packet_size => 1048576},
emqx_channel:caps(channel())).
%%--------------------------------------------------------------------
%% Test cases for channel init
@ -103,7 +102,7 @@ t_chan_caps(_) ->
%% TODO:
t_chan_init(_) ->
Channel = channel().
_Channel = channel().
%%--------------------------------------------------------------------
%% Test cases for channel handle_in
@ -114,8 +113,9 @@ t_handle_in_connect_packet_sucess(_) ->
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
{ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, 0)}], Channel}
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), channel(#{conn_state => idle})),
IdleChannel = channel(#{conn_state => idle}),
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), IdleChannel),
ClientInfo = emqx_channel:info(clientinfo, Channel),
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>
@ -124,7 +124,8 @@ t_handle_in_connect_packet_sucess(_) ->
t_handle_in_unexpected_connect_packet(_) ->
Channel = emqx_channel:set_field(conn_state, connected, channel()),
{shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), Channel}
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
{ok, [{outgoing, Packet}, {close, protocol_error}], Channel}
= emqx_channel:handle_in(?CONNECT_PACKET(connpkt()), Channel).
t_handle_in_qos0_publish(_) ->
@ -144,7 +145,7 @@ t_handle_in_qos1_publish(_) ->
t_handle_in_qos2_publish(_) ->
ok = meck:expect(emqx_session, publish, fun(_, _Msg, Session) -> {ok, [], Session} end),
ok = meck:expect(emqx_session, info, fun(awaiting_rel_timeout, _Session) -> 300000 end),
ok = meck:expect(emqx_session, info, fun(await_rel_timeout, _Session) -> 300 end),
Channel = channel(#{conn_state => connected}),
Publish = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 1, <<"payload">>),
{ok, ?PUBREC_PACKET(1, RC), _NChannel} = emqx_channel:handle_in(Publish, Channel),
@ -154,7 +155,7 @@ t_handle_in_qos2_publish(_) ->
t_handle_in_puback_ok(_) ->
Msg = emqx_message:make(<<"t">>, <<"payload">>),
ok = meck:expect(emqx_session, puback,
fun(PacketId, Session) -> {ok, Msg, Session} end),
fun(_PacketId, Session) -> {ok, Msg, Session} end),
Channel = channel(#{conn_state => connected}),
{ok, _NChannel} = emqx_channel:handle_in(?PUBACK_PACKET(1, ?RC_SUCCESS), Channel).
% ?assertEqual(#{puback_in => 1}, emqx_channel:info(pub_stats, NChannel)).
@ -186,7 +187,7 @@ t_handle_in_pubrec_ok(_) ->
t_handle_in_pubrec_id_in_use(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, Session) ->
fun(_, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_IN_USE}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_IN_USE), _Channel}
@ -196,7 +197,7 @@ t_handle_in_pubrec_id_in_use(_) ->
t_handle_in_pubrec_id_not_found(_) ->
ok = meck:expect(emqx_session, pubrec,
fun(_, Session) ->
fun(_, _Session) ->
{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}
end),
{ok, ?PUBREL_PACKET(1, ?RC_PACKET_IDENTIFIER_NOT_FOUND), _Channel}
@ -236,13 +237,12 @@ t_handle_in_pubcomp_not_found_error(_) ->
t_handle_in_subscribe(_) ->
ok = meck:expect(emqx_session, subscribe,
fun(_, _, _, Session) ->
{ok, Session}
end),
fun(_, _, _, Session) -> {ok, Session} end),
Channel = channel(#{conn_state => connected}),
TopicFilters = [{<<"+">>, ?DEFAULT_SUBOPTS}],
Subscribe = ?SUBSCRIBE_PACKET(1, #{}, TopicFilters),
{ok, ?SUBACK_PACKET(1, [?QOS_0]), _} = emqx_channel:handle_in(Subscribe, Channel).
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_0])}, {event, updated}],
{ok, Replies, _Chan} = emqx_channel:handle_in(Subscribe, Channel).
t_handle_in_unsubscribe(_) ->
ok = meck:expect(emqx_session, unsubscribe,
@ -250,22 +250,24 @@ t_handle_in_unsubscribe(_) ->
{ok, Session}
end),
Channel = channel(#{conn_state => connected}),
UnsubPkt = ?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]),
{ok, ?UNSUBACK_PACKET(1), _} = emqx_channel:handle_in(UnsubPkt, Channel).
{ok, [{outgoing, ?UNSUBACK_PACKET(1)}, {event, updated}], _Chan}
= emqx_channel:handle_in(?UNSUBSCRIBE_PACKET(1, #{}, [<<"+">>]), Channel).
t_handle_in_pingreq(_) ->
{ok, ?PACKET(?PINGRESP), _Channel}
= emqx_channel:handle_in(?PACKET(?PINGREQ), channel()).
t_handle_in_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
Channel = channel(#{conn_state => connected}),
{shutdown, normal, Channel1} = emqx_channel:handle_in(?DISCONNECT_PACKET(?RC_SUCCESS), Channel),
{ok, {close, normal}, Channel1} = emqx_channel:handle_in(Packet, Channel),
?assertEqual(undefined, emqx_channel:info(will_msg, Channel1)).
t_handle_in_auth(_) ->
Channel = channel(#{conn_state => connected}),
Packet = ?DISCONNECT_PACKET(?RC_IMPLEMENTATION_SPECIFIC_ERROR),
{shutdown, implementation_specific_error, Packet, Channel}
{ok, [{outgoing, Packet},
{close, implementation_specific_error}], Channel}
= emqx_channel:handle_in(?AUTH_PACKET(), Channel).
t_handle_in_frame_error(_) ->
@ -273,18 +275,20 @@ t_handle_in_frame_error(_) ->
{shutdown, frame_too_large, _}
= emqx_channel:handle_in({frame_error, frame_too_large}, IdleChannel),
ConnectingChan = channel(#{conn_state => connecting}),
{shutdown, frame_too_large, ?CONNACK_PACKET(?RC_MALFORMED_PACKET), _}
ConnackPacket = ?CONNACK_PACKET(?RC_MALFORMED_PACKET),
{shutdown, frame_too_large, ConnackPacket, _}
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectingChan),
DisconnectPacket = ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET),
ConnectedChan = channel(#{conn_state => connected}),
{shutdown, malformed_Packet, ?DISCONNECT_PACKET(?RC_MALFORMED_PACKET), _}
{ok, [{outgoing, DisconnectPacket}, {close, frame_too_large}], _}
= emqx_channel:handle_in({frame_error, frame_too_large}, ConnectedChan),
DisconnectedChan = channel(#{conn_state => disconnected}),
{ok, DisconnectedChan}
= emqx_channel:handle_in({frame_error, frame_too_large}, DisconnectedChan).
%% TODO:
t_handle_in_expected_packet(_) ->
{shutdown, protocol_error, ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR), _Chan}
Packet = ?DISCONNECT_PACKET(?RC_PROTOCOL_ERROR),
{ok, [{outgoing, Packet}, {close, protocol_error}], _Chan}
= emqx_channel:handle_in(packet, channel()).
t_process_connect(_) ->
@ -292,19 +296,19 @@ t_process_connect(_) ->
fun(true, _ClientInfo, _ConnInfo) ->
{ok, #{session => session(), present => false}}
end),
{ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Channel}
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS)}], _Chan}
= emqx_channel:process_connect(connpkt(), channel(#{conn_state => idle})).
t_handle_publish_qos0(_) ->
t_process_publish_qos0(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Publish = ?PUBLISH_PACKET(?QOS_0, <<"t">>, 1, <<"payload">>),
{ok, _Channel} = emqx_channel:handle_publish(Publish, channel()).
{ok, _Channel} = emqx_channel:process_publish(Publish, channel()).
t_process_publish_qos1(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
Publish = ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>),
{ok, ?PUBACK_PACKET(1, ?RC_NO_MATCHING_SUBSCRIBERS), _Channel}
= emqx_channel:process_publish(1, Msg, channel()).
= emqx_channel:process_publish(Publish, channel()).
t_process_subscribe(_) ->
ok = meck:expect(emqx_session, subscribe, fun(_, _, _, Session) -> {ok, Session} end),
@ -317,51 +321,53 @@ t_process_unsubscribe(_) ->
{[?RC_SUCCESS], _Channel} = emqx_channel:process_unsubscribe(TopicFilters, channel()).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%% Test cases for handle_deliver
%%--------------------------------------------------------------------
t_handle_out_delivers(_) ->
t_handle_deliver(_) ->
WithPacketId = fun(Msgs) ->
lists:zip(lists:seq(1, length(Msgs)), Msgs)
end,
ok = meck:expect(emqx_session, deliver,
fun(Delivers, Session) ->
Msgs = [Msg || {deliver, _, Msg} <- Delivers],
Publishes = [{publish, PacketId, Msg}
|| {PacketId, Msg} <- WithPacketId(Msgs)],
Publishes = WithPacketId([Msg || {deliver, _, Msg} <- Delivers]),
{ok, Publishes, Session}
end),
ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20000 end),
ok = meck:expect(emqx_session, info, fun(retry_interval, _Session) -> 20 end),
Msg0 = emqx_message:make(test, ?QOS_1, <<"t1">>, <<"qos1">>),
Msg1 = emqx_message:make(test, ?QOS_2, <<"t2">>, <<"qos2">>),
Delivers = [{deliver, <<"+">>, Msg0}, {deliver, <<"+">>, Msg1}],
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_out(Delivers, channel()),
{ok, {outgoing, Packets}, _Ch} = emqx_channel:handle_deliver(Delivers, channel()),
?assertEqual([?QOS_1, ?QOS_2], [emqx_packet:qos(Pkt)|| Pkt <- Packets]).
t_handle_out_publishes(_) ->
Channel = channel(#{conn_state => connected}),
Pub0 = {publish, undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
Pub1 = {publish, 1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
{ok, {outgoing, Packets}, _NChannel}
= emqx_channel:handle_out({publish, [Pub0, Pub1]}, Channel),
?assertEqual(2, length(Packets)).
% ?assertEqual(#{publish_out => 2}, emqx_channel:info(pub_stats, NChannel)).
%%--------------------------------------------------------------------
%% Test cases for handle_out
%%--------------------------------------------------------------------
t_handle_out_publish(_) ->
Channel = channel(#{conn_state => connected}),
Pub0 = {undefined, emqx_message:make(<<"t">>, <<"qos0">>)},
Pub1 = {1, emqx_message:make(<<"c">>, ?QOS_1, <<"t">>, <<"qos1">>)},
{ok, {outgoing, Packets}, _NChannel}
= emqx_channel:handle_out(publish, [Pub0, Pub1], Channel),
?assertEqual(2, length(Packets)).
t_handle_out_publish_1(_) ->
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t">>, <<"payload">>),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"t">>, 1, <<"payload">>), _Chan}
= emqx_channel:handle_out({publish, 1, Msg}, channel()).
= emqx_channel:handle_out(publish, [{1, Msg}], channel()).
t_handle_out_publish_nl(_) ->
ClientInfo = clientinfo(#{clientid => <<"clientid">>}),
Channel = channel(#{clientinfo => ClientInfo}),
Msg = emqx_message:make(<<"clientid">>, ?QOS_1, <<"t1">>, <<"qos1">>),
Publish = {publish, 1, emqx_message:set_flag(nl, Msg)},
{ok, Channel} = emqx_channel:handle_out(Publish, Channel).
Pubs = [{1, emqx_message:set_flag(nl, Msg)}],
{ok, Channel} = emqx_channel:handle_out(publish, Pubs, Channel).
t_handle_out_connack_sucess(_) ->
{ok, [{connack, ?CONNACK_PACKET(?RC_SUCCESS, SP, _)}], _Chan}
= emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()).
{ok, [{event, connected}, {connack, ?CONNACK_PACKET(?RC_SUCCESS, 0, _)}], Channel}
= emqx_channel:handle_out(connack, {?RC_SUCCESS, 0, connpkt()}, channel()),
?assertEqual(connected, emqx_channel:info(conn_state, Channel)).
t_handle_out_connack_failure(_) ->
{shutdown, not_authorized, ?CONNACK_PACKET(?RC_NOT_AUTHORIZED), _Chan}
@ -377,7 +383,6 @@ t_handle_out_pubrec(_) ->
Channel = channel(#{conn_state => connected}),
{ok, ?PUBREC_PACKET(1, ?RC_SUCCESS), _NChannel}
= emqx_channel:handle_out(pubrec, {1, ?RC_SUCCESS}, Channel).
% ?assertEqual(#{pubrec_out => 1}, emqx_channel:info(pub_stats, NChannel)).
t_handle_out_pubrel(_) ->
Channel = channel(#{conn_state => connected}),
@ -385,23 +390,24 @@ t_handle_out_pubrel(_) ->
= emqx_channel:handle_out(pubrel, {1, ?RC_SUCCESS}, Channel),
{ok, ?PUBREL_PACKET(2, ?RC_SUCCESS), _Channel2}
= emqx_channel:handle_out(pubrel, {2, ?RC_SUCCESS}, Channel1).
% ?assertEqual(#{pubrel_out => 2}, emqx_channel:info(pub_stats, Channel2)).
t_handle_out_pubcomp(_) ->
{ok, ?PUBCOMP_PACKET(1, ?RC_SUCCESS), _Channel}
= emqx_channel:handle_out(pubcomp, {1, ?RC_SUCCESS}, channel()).
% ?assertEqual(#{pubcomp_out => 1}, emqx_channel:info(pub_stats, Channel)).
t_handle_out_suback(_) ->
{ok, ?SUBACK_PACKET(1, [?QOS_2]), _Channel}
Replies = [{outgoing, ?SUBACK_PACKET(1, [?QOS_2])}, {event, updated}],
{ok, Replies, _Channel}
= emqx_channel:handle_out(suback, {1, [?QOS_2]}, channel()).
t_handle_out_unsuback(_) ->
{ok, ?UNSUBACK_PACKET(1, [?RC_SUCCESS]), _Channel}
Replies = [{outgoing, ?UNSUBACK_PACKET(1, [?RC_SUCCESS])}, {event, updated}],
{ok, Replies, _Channel}
= emqx_channel:handle_out(unsuback, {1, [?RC_SUCCESS]}, channel()).
t_handle_out_disconnect(_) ->
{shutdown, normal, ?DISCONNECT_PACKET(?RC_SUCCESS), _Chan}
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
{ok, [{outgoing, Packet}, {close, normal}], _Chan}
= emqx_channel:handle_out(disconnect, ?RC_SUCCESS, channel()).
t_handle_out_unexpected(_) ->
@ -444,33 +450,33 @@ t_handle_info_unsubscribe(_) ->
{ok, _Chan} = emqx_channel:handle_info({unsubscribe, topic_filters()}, channel()).
t_handle_info_sock_closed(_) ->
{ok, _Chan} = emqx_channel:handle_out({sock_closed, reason},
channel(#{conn_state => disconnected})).
Channel = channel(#{conn_state => disconnected}),
{ok, Channel} = emqx_channel:handle_info({sock_closed, reason}, Channel).
%%--------------------------------------------------------------------
%% Test cases for handle_timeout
%%--------------------------------------------------------------------
t_handle_timeout_emit_stats(_) ->
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
TRef = make_ref(),
ok = meck:expect(emqx_cm, set_chan_stats, fun(_, _) -> ok end),
Channel = emqx_channel:set_field(timers, #{stats_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, {emit_stats, []}, Channel).
t_handle_timeout_keepalive(_) ->
TRef = make_ref(),
Channel = emqx_channel:set_field(timers, #{alive_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, channel()).
{ok, _Chan} = emqx_channel:handle_timeout(make_ref(), {keepalive, 10}, Channel).
t_handle_timeout_retry_delivery(_) ->
ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
TRef = make_ref(),
ok = meck:expect(emqx_session, retry, fun(Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{retry_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, channel()).
{ok, _Chan} = emqx_channel:handle_timeout(TRef, retry_delivery, Channel).
t_handle_timeout_expire_awaiting_rel(_) ->
ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
TRef = make_ref(),
ok = meck:expect(emqx_session, expire, fun(_, Session) -> {ok, Session} end),
Channel = emqx_channel:set_field(timers, #{await_timer => TRef}, channel()),
{ok, _Chan} = emqx_channel:handle_timeout(TRef, expire_awaiting_rel, Channel).
@ -522,7 +528,7 @@ t_check_subscribe(_) ->
ok = emqx_channel:check_subscribe(<<"t">>, ?DEFAULT_SUBOPTS, channel()),
ok = meck:unload(emqx_zone).
t_enrich_caps(_) ->
t_enrich_connack_caps(_) ->
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
ok = meck:expect(emqx_mqtt_caps, get_caps,
fun(_Zone) ->
@ -534,7 +540,7 @@ t_enrich_caps(_) ->
wildcard_subscription => true
}
end),
AckProps = emqx_channel:enrich_caps(#{}, channel()),
AckProps = emqx_channel:enrich_connack_caps(#{}, channel()),
?assertMatch(#{'Retain-Available' := 1,
'Maximum-Packet-Size' := 1024,
'Topic-Alias-Maximum' := 10,

View File

@ -98,7 +98,7 @@ t_cm(_) ->
{ok, C} = emqtt:start_link([{clientid, ClientId}]),
{ok, _} = emqtt:connect(C),
ct:sleep(500),
#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_attrs(ClientId),
#{clientinfo := #{clientid := ClientId}} = emqx_cm:get_chan_info(ClientId),
emqtt:subscribe(C, <<"mytopic">>, 0),
ct:sleep(1200),
Stats = emqx_cm:get_chan_stats(ClientId),
@ -251,7 +251,7 @@ t_basic_with_props_v5(_) ->
%% General test cases.
%%--------------------------------------------------------------------
t_basic(Opts) ->
t_basic(_Opts) ->
Topic = nth(1, ?TOPICS),
{ok, C} = emqtt:start_link([{proto_ver, v4}]),
{ok, _} = emqtt:connect(C),

View File

@ -46,19 +46,23 @@ t_reg_unreg_channel(_) ->
ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_get_set_chan_attrs(_) ->
Attrs = #{proto_ver => 4, proto_name => <<"MQTT">>},
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:set_chan_attrs(<<"clientid">>, Attrs),
?assertEqual(Attrs, emqx_cm:get_chan_attrs(<<"clientid">>)),
t_get_set_chan_info(_) ->
Info = #{proto_ver => 4, proto_name => <<"MQTT">>},
ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
Info1 = Info#{proto_ver => 5},
true = emqx_cm:set_chan_info(<<"clientid">>, Info1),
?assertEqual(Info1, emqx_cm:get_chan_info(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual(undefined, emqx_cm:get_chan_attrs(<<"clientid">>)).
?assertEqual(undefined, emqx_cm:get_chan_info(<<"clientid">>)).
t_get_set_chan_stats(_) ->
Stats = [{recv_oct, 10}, {send_oct, 8}],
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:set_chan_stats(<<"clientid">>, Stats),
ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats),
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
Stats1 = [{recv_oct, 10}|Stats],
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
?assertEqual(Stats1, emqx_cm:get_chan_stats(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).

View File

@ -112,7 +112,7 @@ t_start_link_exit_on_activate(_) ->
t_get_conn_info(_) ->
with_connection(fun(CPid) ->
#{sockinfo := SockInfo} = emqx_connection:info(CPid),
?assertEqual(#{active_n => 100,limiter => undefined,
?assertEqual(#{active_n => 100,
peername => {{127,0,0,1},3456},
sockname => {{127,0,0,1},1883},
sockstate => running,
@ -218,8 +218,7 @@ t_handle_sock_closed(_) ->
end),
CPid ! {tcp_closed, sock},
timer:sleep(100),
%%TODO: closed?
trap_exit(CPid, {shutdown, closed})
trap_exit(CPid, {shutdown, tcp_closed})
end, #{trap_exit => true}).
t_handle_outgoing(_) ->

View File

@ -102,7 +102,7 @@ t_drain_deliver(_) ->
self() ! {deliver, t2, m2},
?assertEqual([{deliver, t1, m1},
{deliver, t2, m2}
], emqx_misc:drain_deliver()).
], emqx_misc:drain_deliver(2)).
t_drain_down(_) ->
{Pid1, _Ref1} = erlang:spawn_monitor(fun() -> ok end),

View File

@ -19,26 +19,70 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> emqx_ct:all(?MODULE).
init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_testcase(_TestCase, Config) ->
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx]).
% t_load(_) ->
% error('TODO').
%% Test case for emqx_mod_presence
t_mod_presence(_) ->
ok = emqx_mod_presence:load([{qos, ?QOS_1}]),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
%% Connected Presence
{ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
{username, <<"username">>}]),
{ok, _} = emqtt:connect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"connected">>),
%% Disconnected Presence
ok = emqtt:disconnect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
ok = emqtt:disconnect(C1),
ok = emqx_mod_presence:unload([{qos, ?QOS_1}]).
% t_unload(_) ->
% error('TODO').
t_mod_presence_reason(_) ->
?assertEqual(normal, emqx_mod_presence:reason(normal)),
?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)).
% t_on_client_connected(_) ->
% error('TODO').
% t_on_client_disconnected(_) ->
% error('TODO').
recv_and_check_presence(ClientId, Presence) ->
{ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
binary:split(Topic, <<"/">>, [global])),
case Presence of
<<"connected">> ->
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>,
ipaddress := <<"127.0.0.1">>,
proto_name := <<"MQTT">>,
proto_ver := ?MQTT_PROTO_V4,
connack := ?RC_SUCCESS,
clean_start := true}, emqx_json:decode(Payload, [{labels, atom}, return_maps]));
<<"disconnected">> ->
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>,
reason := <<"normal">>}, emqx_json:decode(Payload, [{labels, atom}, return_maps]))
end.
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -19,28 +19,65 @@
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-define(RULES, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
]).
all() -> emqx_ct:all(?MODULE).
init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_testcase(_TestCase, Config) ->
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx]).
% t_load(_) ->
% error('TODO').
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(?RULES),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>],
DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>],
%% Subscribe
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- OrigTopics]),
timer:sleep(100),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(DestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
%% Publish
RecvTopics = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- OrigTopics],
?assertEqual(DestTopics, RecvTopics),
%% Unsubscribe
{ok, _, _} = emqtt:unsubscribe(C, OrigTopics),
timer:sleep(100),
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
ok = emqtt:disconnect(C),
ok = emqx_mod_rewrite:unload(?RULES).
% t_rewrite_subscribe(_) ->
% error('TODO').
t_rewrite_rule(_Config) ->
Rules = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, Rules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, Rules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, Rules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, Rules)).
% t_rewrite_unsubscribe(_) ->
% error('TODO').
% t_rewrite_publish(_) ->
% error('TODO').
% t_unload(_) ->
% error('TODO').
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -1,160 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_modules_SUITE).
%% API
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
%%-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
%%-define(PROPTEST(M,F), true = proper:quickcheck(M:F())).
-define(RULES, [{rewrite,<<"x/#">>,<<"^x/y/(.+)$">>,<<"z/y/$1">>},
{rewrite,<<"y/+/z/#">>,<<"^y/(.+)/z/(.+)$">>,<<"y/z/$2">>}
]).
all() -> emqx_ct:all(?MODULE).
suite() ->
[{ct_hooks,[cth_surefire]}, {timetrap, {seconds, 30}}].
init_per_suite(Config) ->
emqx_ct_helpers:boot_modules(all),
emqx_ct_helpers:start_apps([emqx]),
%% Ensure all the modules unloaded.
ok = emqx_modules:unload(),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx]).
%%--------------------------------------------------------------------
%% Test cases
%%--------------------------------------------------------------------
% t_unload(_) ->
% error('TODO').
% t_load(_) ->
% error('TODO').
%% Test case for emqx_mod_presence
t_mod_presence(_) ->
ok = emqx_mod_presence:load([{qos, ?QOS_1}]),
{ok, C1} = emqtt:start_link([{clientid, <<"monsys">>}]),
{ok, _} = emqtt:connect(C1),
{ok, _Props, [?QOS_1]} = emqtt:subscribe(C1, <<"$SYS/brokers/+/clients/#">>, qos1),
%% Connected Presence
{ok, C2} = emqtt:start_link([{clientid, <<"clientid">>},
{username, <<"username">>}]),
{ok, _} = emqtt:connect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"connected">>),
%% Disconnected Presence
ok = emqtt:disconnect(C2),
ok = recv_and_check_presence(<<"clientid">>, <<"disconnected">>),
ok = emqtt:disconnect(C1),
ok = emqx_mod_presence:unload([{qos, ?QOS_1}]).
t_mod_presence_reason(_) ->
?assertEqual(normal, emqx_mod_presence:reason(normal)),
?assertEqual(discarded, emqx_mod_presence:reason({shutdown, discarded})),
?assertEqual(tcp_error, emqx_mod_presence:reason({tcp_error, einval})),
?assertEqual(internal_error, emqx_mod_presence:reason(<<"unknown error">>)).
recv_and_check_presence(ClientId, Presence) ->
{ok, #{qos := ?QOS_1, topic := Topic, payload := Payload}} = receive_publish(100),
?assertMatch([<<"$SYS">>, <<"brokers">>, _Node, <<"clients">>, ClientId, Presence],
binary:split(Topic, <<"/">>, [global])),
case Presence of
<<"connected">> ->
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>,
ipaddress := <<"127.0.0.1">>,
proto_name := <<"MQTT">>,
proto_ver := ?MQTT_PROTO_V4,
connack := ?RC_SUCCESS,
clean_start := true}, emqx_json:decode(Payload, [{labels, atom}, return_maps]));
<<"disconnected">> ->
?assertMatch(#{clientid := <<"clientid">>,
username := <<"username">>,
reason := <<"normal">>}, emqx_json:decode(Payload, [{labels, atom}, return_maps]))
end.
%% Test case for emqx_mod_subscription
t_mod_subscription(_) ->
emqx_mod_subscription:load([{<<"connected/%c/%u">>, ?QOS_0}]),
{ok, C} = emqtt:start_link([{host, "localhost"},
{clientid, "myclient"},
{username, "admin"}]),
{ok, _} = emqtt:connect(C),
emqtt:publish(C, <<"connected/myclient/admin">>, <<"Hello world">>, ?QOS_0),
{ok, #{topic := Topic, payload := Payload}} = receive_publish(100),
?assertEqual(<<"connected/myclient/admin">>, Topic),
?assertEqual(<<"Hello world">>, Payload),
ok = emqtt:disconnect(C),
emqx_mod_subscription:unload([]).
%% Test case for emqx_mod_write
t_mod_rewrite(_Config) ->
ok = emqx_mod_rewrite:load(?RULES),
{ok, C} = emqtt:start_link([{clientid, <<"rewrite_client">>}]),
{ok, _} = emqtt:connect(C),
OrigTopics = [<<"x/y/2">>, <<"x/1/2">>, <<"y/a/z/b">>, <<"y/def">>],
DestTopics = [<<"z/y/2">>, <<"x/1/2">>, <<"y/z/b">>, <<"y/def">>],
%% Subscribe
{ok, _Props, _} = emqtt:subscribe(C, [{Topic, ?QOS_1} || Topic <- OrigTopics]),
timer:sleep(100),
Subscriptions = emqx_broker:subscriptions(<<"rewrite_client">>),
?assertEqual(DestTopics, [Topic || {Topic, _SubOpts} <- Subscriptions]),
%% Publish
RecvTopics = [begin
ok = emqtt:publish(C, Topic, <<"payload">>),
{ok, #{topic := RecvTopic}} = receive_publish(100),
RecvTopic
end || Topic <- OrigTopics],
?assertEqual(DestTopics, RecvTopics),
%% Unsubscribe
{ok, _, _} = emqtt:unsubscribe(C, OrigTopics),
timer:sleep(100),
?assertEqual([], emqx_broker:subscriptions(<<"rewrite_client">>)),
ok = emqtt:disconnect(C),
ok = emqx_mod_rewrite:unload(?RULES).
t_rewrite_rule(_Config) ->
Rules = emqx_mod_rewrite:compile(?RULES),
?assertEqual(<<"z/y/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/y/2">>, Rules)),
?assertEqual(<<"x/1/2">>, emqx_mod_rewrite:match_and_rewrite(<<"x/1/2">>, Rules)),
?assertEqual(<<"y/z/b">>, emqx_mod_rewrite:match_and_rewrite(<<"y/a/z/b">>, Rules)),
?assertEqual(<<"y/def">>, emqx_mod_rewrite:match_and_rewrite(<<"y/def">>, Rules)).
%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------
receive_publish(Timeout) ->
receive
{publish, Publish} -> {ok, Publish}
after
Timeout -> {error, timeout}
end.

View File

@ -82,7 +82,10 @@ t_check_publish(_) ->
Props = #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1},
ok = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"topic">>, 1, Props, <<"payload">>)),
ok = emqx_packet:check(#mqtt_packet_publish{packet_id = 1, topic_name = <<"t">>}),
ok = emqx_packet:check(#mqtt_packet_publish{topic_name = <<>>, properties = #{'Topic-Alias'=> 0}}),
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(#mqtt_packet_publish{topic_name = <<>>,
properties = #{'Topic-Alias'=> 0}
}),
{error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<>>, 1, #{}, <<"payload">>)),
{error, ?RC_TOPIC_NAME_INVALID} = emqx_packet:check(?PUBLISH_PACKET(?QOS_1, <<"+/+">>, 1, #{}, <<"payload">>)),
{error, ?RC_TOPIC_ALIAS_INVALID} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)),
@ -152,7 +155,9 @@ t_from_to_message(_) ->
ExpectedMsg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
ExpectedMsg1 = emqx_message:set_flag(retain, false, ExpectedMsg),
ExpectedMsg2 = emqx_message:set_headers(#{peerhost => {127,0,0,1},
username => <<"test">>}, ExpectedMsg1),
protocol => mqtt,
username => <<"test">>
}, ExpectedMsg1),
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = ?QOS_0,
retain = false,
@ -161,7 +166,8 @@ t_from_to_message(_) ->
packet_id = 10,
properties = #{}},
payload = <<"payload">>},
MsgFromPkt = emqx_packet:to_message(#{clientid => <<"clientid">>,
MsgFromPkt = emqx_packet:to_message(#{protocol => mqtt,
clientid => <<"clientid">>,
username => <<"test">>,
peerhost => {127,0,0,1}}, Pkt),
?assertEqual(ExpectedMsg2, MsgFromPkt#message{id = emqx_message:id(ExpectedMsg),

View File

@ -25,23 +25,10 @@
all() -> emqx_ct:all(?MODULE).
% t_name(_) ->
% error('TODO').
% t_text(_) ->
% error('TODO').
% t_mqtt_frame_error(_) ->
% error('TODO').
% t_connack_error(_) ->
% error('TODO').
% t_compat(_) ->
% error('TODO').
% t_formalized(_) ->
% error('TODO').
t_frame_error(_) ->
?assertEqual(?RC_PACKET_TOO_LARGE, emqx_reason_codes:frame_error(frame_too_large)),
?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_packet_id)),
?assertEqual(?RC_MALFORMED_PACKET, emqx_reason_codes:frame_error(bad_qos)).
t_prop_name_text(_) ->
?assert(proper:quickcheck(prop_name_text(), prop_name_text(opts))).
@ -77,6 +64,7 @@ prop_connack_error() ->
%%--------------------------------------------------------------------
%% Helper
%%--------------------------------------------------------------------
default_opts() ->
default_opts([]).
@ -161,3 +149,4 @@ mqttv5_version() ->
mqttv3_version() ->
oneof([?MQTT_PROTO_V3, ?MQTT_PROTO_V4]).

View File

@ -59,7 +59,7 @@ t_session_init(_) ->
?assertEqual(0, emqx_session:info(retry_interval, Session)),
?assertEqual(0, emqx_session:info(awaiting_rel_cnt, Session)),
?assertEqual(100, emqx_session:info(awaiting_rel_max, Session)),
?assertEqual(3600000, emqx_session:info(awaiting_rel_timeout, Session)),
?assertEqual(300, emqx_session:info(await_rel_timeout, Session)),
?assert(is_integer(emqx_session:info(created_at, Session))).
%%--------------------------------------------------------------------
@ -67,28 +67,23 @@ t_session_init(_) ->
%%--------------------------------------------------------------------
t_session_info(_) ->
Info = emqx_session:info(session()),
?assertMatch(#{subscriptions := #{},
subscriptions_max := 0,
upgrade_qos := false,
inflight_max := 0,
retry_interval := 0,
await_rel_timeout := 300
}, emqx_session:info(session())).
t_session_stats(_) ->
Stats = emqx_session:stats(session()),
?assertMatch(#{subscriptions_max := 0,
inflight_max := 0,
mqueue_len := 0,
mqueue_max := 1000,
mqueue_dropped := 0,
next_pkt_id := 1,
awaiting_rel := #{},
awaiting_rel_max := 100,
awaiting_rel_timeout := 3600000
}, Info).
t_session_attrs(_) ->
Attrs = emqx_session:attrs(session()),
io:format("~p~n", [Attrs]).
t_session_stats(_) ->
Stats = emqx_session:stats(session()),
io:format("~p~n", [Stats]).
awaiting_rel_cnt := 0,
awaiting_rel_max := 100
}, maps:from_list(Stats)).
%%--------------------------------------------------------------------
%% Test cases for pub/sub
@ -128,12 +123,12 @@ t_publish_qos2(_) ->
t_publish_qos1(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(1, Msg, session()).
{ok, [], _Session} = emqx_session:publish(1, Msg, session()).
t_publish_qos0(_) ->
ok = meck:expect(emqx_broker, publish, fun(_) -> [] end),
Msg = emqx_message:make(test, ?QOS_1, <<"t">>, <<"payload">>),
{ok, [], Session} = emqx_session:publish(0, Msg, session()).
{ok, [], _Session} = emqx_session:publish(0, Msg, session()).
t_is_awaiting_full_false(_) ->
?assertNot(emqx_session:is_awaiting_full(session(#{max_awaiting_rel => 0}))).
@ -196,7 +191,7 @@ t_pubcomp_id_not_found(_) ->
%%--------------------------------------------------------------------
t_dequeue(_) ->
{ok, Session} = emqx_session:dequeue(session()).
{ok, _Session} = emqx_session:dequeue(session()).
t_deliver(_) ->
Delivers = [delivery(?QOS_1, <<"t1">>), delivery(?QOS_2, <<"t2">>)],
@ -222,7 +217,6 @@ t_takeover(_) ->
t_resume(_) ->
ok = meck:expect(emqx_broker, subscribe, fun(_, _, _) -> ok end),
Subs = #{<<"t">> => ?DEFAULT_SUBOPTS},
Session = session(#{subscriptions => #{<<"t">> => ?DEFAULT_SUBOPTS}}),
ok = emqx_session:resume(<<"clientid">>, Session).

View File

@ -138,7 +138,7 @@ t_websocket_info_incoming(_) ->
t_websocket_info_deliver(_) ->
with_ws_conn(fun(WsConn) ->
ok = meck:expect(emqx_channel, handle_out,
ok = meck:expect(emqx_channel, handle_deliver,
fun(Delivers, Channel) ->
Packets = [emqx_message:to_packet(1, Msg) || {deliver, _, Msg} <- Delivers],
{ok, {outgoing, Packets}, Channel}

View File

@ -25,7 +25,7 @@
{server_keepalive, 60},
{upgrade_qos, false},
{session_expiry_interval, 7200},
{retry_interval, 20000},
{retry_interval, 20},
{mqueue_store_qos0, true},
{mqueue_priorities, none},
{mqueue_default_priority, highest},
@ -43,7 +43,7 @@
{enable_flapping_detect, false},
{enable_ban, true},
{enable_acl, true},
{await_rel_timeout, 300000},
{await_rel_timeout, 300},
{acl_deny_action, ignore}
]).