Merge pull request #3514 from emqx/develop

Auto-pull-request-by-2020-06-04
This commit is contained in:
tigercl 2020-06-04 15:58:22 +08:00 committed by GitHub
commit 6093432834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 390 additions and 60 deletions

View File

@ -414,6 +414,9 @@ log.dir = {{ platform_log_dir }}
## The log filename for logs of level specified in "log.level". ## The log filename for logs of level specified in "log.level".
## ##
## If `log.rotation` is enabled, this is the base name of the
## files. Each file in a rotated log is named <base_name>.N, where N is an integer.
##
## Value: String ## Value: String
## Default: emqx.log ## Default: emqx.log
log.file = emqx.log log.file = emqx.log
@ -424,6 +427,14 @@ log.file = emqx.log
## Default: No Limit ## Default: No Limit
#log.chars_limit = 8192 #log.chars_limit = 8192
## Enables the log rotation.
## With this enabled, new log files will be created when the current
## log file is full, max to `log.rotation.size` files will be created.
##
## Value: on | off
## Default: on
log.rotation = on
## Maximum size of each log file. ## Maximum size of each log file.
## ##
## Value: Number ## Value: Number
@ -446,9 +457,103 @@ log.rotation.count = 5
## Note: Log files for a specific log level will only contain all the logs ## Note: Log files for a specific log level will only contain all the logs
## that higher than or equal to that level ## that higher than or equal to that level
## ##
#log.info.file = info.log #log.info.file = info.log
#log.error.file = error.log #log.error.file = error.log
## The max allowed queue length before switching to sync mode.
##
## Log overload protection parameter. If the message queue grows
## larger than this value the handler switches from anync to sync mode.
##
## Default: 100
##
#log.sync_mode_qlen = 100
## The max allowed queue length before switching to drop mode.
##
## Log overload protection parameter. When the message queue grows
## larger than this threshold, the handler switches to a mode in which
## it drops all new events that senders want to log.
##
## Default: 3000
##
#log.drop_mode_qlen = 3000
## The max allowed queue length before switching to flush mode.
##
## Log overload protection parameter. If the length of the message queue
## grows larger than this threshold, a flush (delete) operation takes place.
## To flush events, the handler discards the messages in the message queue
## by receiving them in a loop without logging.
##
## Default: 8000
##
#log.flush_qlen = 8000
## Kill the log handler when it gets overloaded.
##
## Log overload protection parameter. It is possible that a handler,
## even if it can successfully manage peaks of high load without crashing,
## can build up a large message queue, or use a large amount of memory.
## We could kill the log handler in these cases and restart it after a
## few seconds.
##
## Default: on
##
#log.overload_kill = on
## The max allowed queue length before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum allowed queue
## length. If the message queue grows larger than this, the handler
## process is terminated.
##
## Default: 20000
##
#log.overload_kill_qlen = 20000
## The max allowed memory size before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum memory size
## that the handler process is allowed to use. If the handler grows
## larger than this, the process is terminated.
##
## Default: 30MB
##
#log.overload_kill_mem_size = 30MB
## Restart the log hanlder after some seconds.
##
## Log overload protection parameter. If the handler is terminated,
## it restarts automatically after a delay specified in seconds.
## The value "infinity" prevents restarts.
##
## Default: 5s
##
#log.overload_kill_restart_after = 5s
## Max burst count and time window for burst control.
##
## Log overload protection parameter. Large bursts of log events - many
## events received by the handler under a short period of time - can
## potentially cause problems. By specifying the maximum number of events
## to be handled within a certain time frame, the handler can avoid
## choking the log with massive amounts of printouts.
##
## This config controls the maximum number of events to handle within
## a time frame. After the limit is reached, successive events are
## dropped until the end of the time frame.
##
## Note that there would be no warning if any messages were
## dropped because of burst control.
##
## Comment this config out to disable the burst control feature.
##
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
##-------------------------------------------------------------------- ##--------------------------------------------------------------------
## Authentication/Access Control ## Authentication/Access Control
##-------------------------------------------------------------------- ##--------------------------------------------------------------------

View File

@ -477,11 +477,21 @@ end}.
{datatype, integer} {datatype, integer}
]}. ]}.
{mapping, "log.rotation", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.rotation.size", "kernel.logger", [ {mapping, "log.rotation.size", "kernel.logger", [
{default, "10MB"}, {default, "10MB"},
{datatype, bytesize} {datatype, bytesize}
]}. ]}.
{mapping, "log.size", "kernel.logger", [
{default, infinity},
{datatype, [bytesize, atom]}
]}.
{mapping, "log.rotation.count", "kernel.logger", [ {mapping, "log.rotation.count", "kernel.logger", [
{default, 5}, {default, 5},
{datatype, integer} {datatype, integer}
@ -491,6 +501,46 @@ end}.
{datatype, file} {datatype, file}
]}. ]}.
{mapping, "log.sync_mode_qlen", "kernel.logger", [
{default, 100},
{datatype, integer}
]}.
{mapping, "log.drop_mode_qlen", "kernel.logger", [
{default, 3000},
{datatype, integer}
]}.
{mapping, "log.flush_qlen", "kernel.logger", [
{default, 8000},
{datatype, integer}
]}.
{mapping, "log.overload_kill", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.overload_kill_mem_size", "kernel.logger", [
{default, "30MB"},
{datatype, bytesize}
]}.
{mapping, "log.overload_kill_qlen", "kernel.logger", [
{default, 20000},
{datatype, integer}
]}.
{mapping, "log.overload_kill_restart_after", "kernel.logger", [
{default, "5s"},
{datatype, [{duration, ms}, atom]}
]}.
{mapping, "log.burst_limit", "kernel.logger", [
{default, "disabled"},
{datatype, string}
]}.
{mapping, "log.sasl", "sasl.sasl_error_logger", [ {mapping, "log.sasl", "sasl.sasl_error_logger", [
{default, off}, {default, off},
{datatype, flag}, {datatype, flag},
@ -521,6 +571,10 @@ end}.
{translation, "kernel.logger", fun(Conf) -> {translation, "kernel.logger", fun(Conf) ->
LogTo = cuttlefish:conf_get("log.to", Conf), LogTo = cuttlefish:conf_get("log.to", Conf),
LogLevel = cuttlefish:conf_get("log.level", Conf), LogLevel = cuttlefish:conf_get("log.level", Conf),
LogType = case cuttlefish:conf_get("log.rotation", Conf) of
true -> wrap;
false -> halt
end,
CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of
-1 -> unlimited; -1 -> unlimited;
V -> V V -> V
@ -537,11 +591,37 @@ end}.
[]}]}, []}]},
msg,"\n"], msg,"\n"],
chars_limit => CharsLimit}}, chars_limit => CharsLimit}},
{BustLimitOn, {MaxBurstCount, TimeWindow}} =
case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of
["disabled"] -> {false, {20000, 1000}};
[Count, Window] ->
{true, {list_to_integer(Count),
case cuttlefish_duration:parse(Window, ms) of
Secs when is_integer(Secs) -> Secs;
{error, Reason1} -> error(Reason1)
end}}
end,
FileConf = fun(Filename) -> FileConf = fun(Filename) ->
#{type => wrap, BasicConf =
#{type => LogType,
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)} sync_mode_qlen => cuttlefish:conf_get("log.sync_mode_qlen", Conf),
drop_mode_qlen => cuttlefish:conf_get("log.drop_mode_qlen", Conf),
flush_qlen => cuttlefish:conf_get("log.flush_qlen", Conf),
overload_kill_enable => cuttlefish:conf_get("log.overload_kill", Conf),
overload_kill_qlen => cuttlefish:conf_get("log.overload_kill_qlen", Conf),
overload_kill_mem_size => cuttlefish:conf_get("log.overload_kill_mem_size", Conf),
overload_kill_restart_after => cuttlefish:conf_get("log.overload_kill_restart_after", Conf),
burst_limit_enable => BustLimitOn,
burst_limit_max_count => MaxBurstCount,
burst_limit_window_time => TimeWindow
},
MaxNoBytes = case LogType of
wrap -> cuttlefish:conf_get("log.rotation.size", Conf);
halt -> cuttlefish:conf_get("log.size", Conf)
end,
BasicConf#{max_no_bytes => MaxNoBytes}
end, end,
%% For the default logger that outputs to console %% For the default logger that outputs to console

View File

@ -4,7 +4,7 @@
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, [{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.4"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.4"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}}, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}}, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}}, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}} {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}

View File

@ -538,6 +538,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
{error, RC} -> {RC, Channel} {error, RC} -> {RC, Channel}
end. end.
-compile({inline, [process_force_subscribe/2]}).
process_force_subscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
{ok, NSession} ->
{ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Unsubscribe %% Process Unsubscribe
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -563,6 +578,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
{error, RC} -> {RC, Channel} {error, RC} -> {RC, Channel}
end. end.
-compile({inline, [process_force_unsubscribe/2]}).
process_force_unsubscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
{ok, NSession} ->
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Process Disconnect %% Process Disconnect
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
@ -818,6 +847,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel), {_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info({force_subscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) -> handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
TopicFilters1 = run_hooks('client.unsubscribe', TopicFilters1 = run_hooks('client.unsubscribe',
[ClientInfo, #{'Internal' => true}], [ClientInfo, #{'Internal' => true}],
@ -826,6 +859,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel), {_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel}; {ok, NChannel};
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) -> handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel); shutdown(Reason, Channel);

View File

@ -27,9 +27,7 @@
-export([start_link/0]). -export([start_link/0]).
-export([ register_channel/1 -export([ register_channel/3
, register_channel/2
, register_channel/3
, unregister_channel/1 , unregister_channel/1
]). ]).
@ -45,6 +43,8 @@
, set_chan_stats/2 , set_chan_stats/2
]). ]).
-export([get_chann_conn_mod/2]).
-export([ open_session/3 -export([ open_session/3
, discard_session/1 , discard_session/1
, discard_session/2 , discard_session/2
@ -100,28 +100,29 @@ start_link() ->
%% API %% API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% @doc Register a channel.
-spec(register_channel(emqx_types:clientid()) -> ok).
register_channel(ClientId) ->
register_channel(ClientId, self()).
%% @doc Register a channel with pid.
-spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
register_channel(ClientId, ChanPid) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, Chan),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Register a channel with info and stats. %% @doc Register a channel with info and stats.
-spec(register_channel(emqx_types:clientid(), -spec(register_channel(emqx_types:clientid(),
emqx_types:infos(), emqx_types:infos(),
emqx_types:stats()) -> ok). emqx_types:stats()) -> ok).
register_channel(ClientId, Info, Stats) -> register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) ->
Chan = {ClientId, ChanPid = self()}, Chan = {ClientId, ChanPid = self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}), true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
register_channel(ClientId, ChanPid). register_channel(ClientId, ChanPid, ConnInfo);
%% @private
%% @doc Register a channel with pid and conn_mod.
%%
%% There is a Race-Condition on one node or cluster when many connections
%% login to Broker with the same clientid. We should register it and save
%% the conn_mod first for taking up the clientid access right.
%%
%% Note that: It should be called on a lock transaction
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Unregister a channel. %% @doc Unregister a channel.
-spec(unregister_channel(emqx_types:clientid()) -> ok). -spec(unregister_channel(emqx_types:clientid()) -> ok).
@ -132,7 +133,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
%% @private %% @private
do_unregister_channel(Chan) -> do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan), ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete_object(?CHAN_CONN_TAB, Chan), true = ets:delete(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_INFO_TAB, Chan), true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan). ets:delete_object(?CHAN_TAB, Chan).
@ -206,24 +207,29 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
pendings => list()}} pendings => list()}}
| {error, Reason :: term()}). | {error, Reason :: term()}).
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
CleanStart = fun(_) -> CleanStart = fun(_) ->
ok = discard_session(ClientId), ok = discard_session(ClientId),
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}} {ok, #{session => Session, present => false}}
end, end,
emqx_cm_locker:trans(ClientId, CleanStart); emqx_cm_locker:trans(ClientId, CleanStart);
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) -> open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) -> ResumeStart = fun(_) ->
case takeover_session(ClientId) of case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} -> {ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session), ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}), Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, {ok, #{session => Session,
present => true, present => true,
pendings => Pendings}}; pendings => Pendings}};
{error, not_found} -> {error, not_found} ->
Session = create_session(ClientInfo, ConnInfo), Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}} {ok, #{session => Session, present => false}}
end end
end, end,
@ -253,8 +259,8 @@ takeover_session(ClientId) ->
end. end.
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() -> takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}), Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
{ok, ConnMod, ChanPid, Session}; {ok, ConnMod, ChanPid, Session};
undefined -> undefined ->
@ -284,8 +290,8 @@ discard_session(ClientId) when is_binary(ClientId) ->
end. end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() -> discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of case get_chann_conn_mod(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} -> ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard); ConnMod:call(ChanPid, discard);
undefined -> ok undefined -> ok
end; end;
@ -418,3 +424,12 @@ update_stats({Tab, Stat, MaxStat}) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size) Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end. end.
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
catch
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).

View File

@ -30,6 +30,8 @@
-export([ new/1 -export([ new/1
, new/2 , new/2
, ensure/1
, ensure/2
, all/0 , all/0
]). ]).
@ -204,6 +206,17 @@ new(gauge, Name) ->
new(counter, Name) -> new(counter, Name) ->
create(counter, Name). create(counter, Name).
-spec(ensure(metric_name()) -> ok).
ensure(Name) ->
ensure(counter, Name).
-spec(ensure(gauge|counter, metric_name()) -> ok).
ensure(Type, Name) when Type =:= gauge; Type =:= counter ->
case ets:lookup(?TAB, Name) of
[] -> create(Type, Name);
_ -> ok
end.
%% @private %% @private
create(Type, Name) -> create(Type, Name) ->
case gen_server:call(?SERVER, {create, Type, Name}) of case gen_server:call(?SERVER, {create, Type, Name}) of

View File

@ -39,7 +39,7 @@ list() ->
-spec(load() -> ok). -spec(load() -> ok).
load() -> load() ->
case emqx:get_env(modules_loaded_file) of case emqx:get_env(modules_loaded_file) of
undefined -> ignore; undefined -> ok;
File -> File ->
load_modules(File) load_modules(File)
end. end.

View File

@ -50,7 +50,7 @@ name(16#11) -> no_subscription_existed;
name(16#18) -> continue_authentication; name(16#18) -> continue_authentication;
name(16#19) -> re_authenticate; name(16#19) -> re_authenticate;
name(16#80) -> unspecified_error; name(16#80) -> unspecified_error;
name(16#81) -> malformed_Packet; name(16#81) -> malformed_packet;
name(16#82) -> protocol_error; name(16#82) -> protocol_error;
name(16#83) -> implementation_specific_error; name(16#83) -> implementation_specific_error;
name(16#84) -> unsupported_protocol_version; name(16#84) -> unsupported_protocol_version;

View File

@ -91,7 +91,9 @@
-type(ver() :: ?MQTT_PROTO_V3 -type(ver() :: ?MQTT_PROTO_V3
| ?MQTT_PROTO_V4 | ?MQTT_PROTO_V4
| ?MQTT_PROTO_V5). | ?MQTT_PROTO_V5
| non_neg_integer()).
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
-type(qos_name() :: qos0 | at_most_once | -type(qos_name() :: qos0 | at_most_once |
qos1 | at_least_once | qos1 | at_least_once |
@ -107,7 +109,7 @@
-type(conninfo() :: #{socktype := socktype(), -type(conninfo() :: #{socktype := socktype(),
sockname := peername(), sockname := peername(),
peername := peername(), peername := peername(),
peercert := esockd_peercert:peercert(), peercert := nossl | undefined | esockd_peercert:peercert(),
conn_mod := module(), conn_mod := module(),
proto_name := binary(), proto_name := binary(),
proto_ver := ver(), proto_ver := ver(),
@ -116,7 +118,7 @@
username := username(), username := username(),
conn_props := properties(), conn_props := properties(),
connected := boolean(), connected := boolean(),
connected_at := erlang:timestamp(), connected_at := non_neg_integer(),
keepalive := 0..16#FFFF, keepalive := 0..16#FFFF,
receive_maximum := non_neg_integer(), receive_maximum := non_neg_integer(),
expiry_interval := non_neg_integer(), expiry_interval := non_neg_integer(),

View File

@ -23,6 +23,13 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-define(CM, emqx_cm). -define(CM, emqx_cm).
-define(ChanInfo,#{conninfo =>
#{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100}}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% CT callbacks %% CT callbacks
@ -43,13 +50,13 @@ end_per_suite(_Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
t_reg_unreg_channel(_) -> t_reg_unreg_channel(_) ->
ok = emqx_cm:register_channel(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)), ?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)). ?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_get_set_chan_info(_) -> t_get_set_chan_info(_) ->
Info = #{proto_ver => 4, proto_name => <<"MQTT">>}, Info = ?ChanInfo,
ok = emqx_cm:register_channel(<<"clientid">>, Info, []), ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)), ?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
Info1 = Info#{proto_ver => 5}, Info1 = Info#{proto_ver => 5},
@ -60,7 +67,7 @@ t_get_set_chan_info(_) ->
t_get_set_chan_stats(_) -> t_get_set_chan_stats(_) ->
Stats = [{recv_oct, 10}, {send_oct, 8}], Stats = [{recv_oct, 10}, {send_oct, 8}],
ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, Stats),
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)), ?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
Stats1 = [{recv_oct, 10}|Stats], Stats1 = [{recv_oct, 10}|Stats],
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1), true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
@ -69,27 +76,89 @@ t_get_set_chan_stats(_) ->
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)). ?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
t_open_session(_) -> t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ClientInfo = #{zone => external, ClientInfo = #{zone => external,
clientid => <<"clientid">>, clientid => <<"clientid">>,
username => <<"username">>, username => <<"username">>,
peerhost => {127,0,0,1}}, peerhost => {127,0,0,1}},
ConnInfo = #{peername => {{127,0,0,1}, 5000}, ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100}, receive_maximum => 100},
{ok, #{session := Session1, present := false}} {ok, #{session := Session1, present := false}}
= emqx_cm:open_session(true, ClientInfo, ConnInfo), = emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session1)), ?assertEqual(100, emqx_session:info(inflight_max, Session1)),
{ok, #{session := Session2, present := false}} {ok, #{session := Session2, present := false}}
= emqx_cm:open_session(false, ClientInfo, ConnInfo), = emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)). ?assertEqual(100, emqx_session:info(inflight_max, Session2)),
emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_open_session_race_condition(_) ->
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127,0,0,1}},
ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100},
Parent = self(),
OpenASession = fun() ->
timer:sleep(rand:uniform(100)),
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)),
Parent ! OpenR,
case OpenR of
{ok, _} ->
receive
{'$gen_call', From, discard} ->
gen_server:reply(From, ok), ok
end;
{error, Reason} ->
exit(Reason)
end
end,
[spawn(
fun() ->
spawn(OpenASession),
spawn(OpenASession)
end) || _ <- lists:seq(1, 1000)],
WaitingRecv = fun _Wr(N1, N2, 0) ->
{N1, N2};
_Wr(N1, N2, Rest) ->
receive
{ok, _} -> _Wr(N1+1, N2, Rest-1);
{error, _} -> _Wr(N1, N2+1, Rest-1)
end
end,
ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
?assertEqual(1, ets:info(emqx_channel, size)),
?assertEqual(1, ets:info(emqx_channel_conn, size)),
?assertEqual(1, ets:info(emqx_channel_registry, size)),
[Pid] = emqx_cm:lookup_channels(<<"clientid">>),
exit(Pid, kill), timer:sleep(100),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_discard_session(_) -> t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end), ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end), ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>), ok = emqx_cm:discard_session(<<"clientid">>),
@ -97,35 +166,26 @@ t_discard_session(_) ->
ok = meck:unload(emqx_connection). ok = meck:unload(emqx_connection).
t_takeover_session(_) -> t_takeover_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>), {error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
Pid = self(),
{ok, emqx_connection, Pid, test} = emqx_cm:takeover_session(<<"clientid">>),
erlang:spawn(fun() -> erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
timer:sleep(1000) receive
{'$gen_call', From, {takeover, 'begin'}} ->
gen_server:reply(From, test), ok
end
end), end),
ct:sleep(100), timer:sleep(100),
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>), {ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>), emqx_cm:unregister_channel(<<"clientid">>).
ok = meck:unload(emqx_connection).
t_kick_session(_) -> t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]), ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end), ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>), {error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
test = emqx_cm:kick_session(<<"clientid">>), test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn(fun() -> erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []), ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
timer:sleep(1000) timer:sleep(1000)
end), end),
ct:sleep(100), ct:sleep(100),

View File

@ -42,6 +42,24 @@ t_new(_) ->
1 = emqx_metrics:val('metrics.test.total') 1 = emqx_metrics:val('metrics.test.total')
end). end).
t_ensure(_) ->
with_metrics_server(
fun() ->
ok = emqx_metrics:ensure('metrics.test'),
ok = emqx_metrics:ensure('metrics.test'),
0 = emqx_metrics:val('metrics.test'),
ok = emqx_metrics:inc('metrics.test'),
1 = emqx_metrics:val('metrics.test'),
ok = emqx_metrics:ensure(counter, 'metrics.test.cnt'),
0 = emqx_metrics:val('metrics.test.cnt'),
ok = emqx_metrics:inc('metrics.test.cnt'),
1 = emqx_metrics:val('metrics.test.cnt'),
ok = emqx_metrics:ensure(gauge, 'metrics.test.total'),
0 = emqx_metrics:val('metrics.test.total'),
ok = emqx_metrics:inc('metrics.test.total'),
1 = emqx_metrics:val('metrics.test.total')
end).
t_all(_) -> t_all(_) ->
with_metrics_server( with_metrics_server(
fun() -> fun() ->