Merge remote-tracking branch 'origin/master' into release-4.1

This commit is contained in:
zhanghongtong 2020-06-04 09:00:12 +00:00
commit e81b8d6a4a
13 changed files with 418 additions and 83 deletions

View File

@ -7,8 +7,7 @@
[![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
[![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)
[![We are hiring](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers)
[![最棒的物联网 MQTT 开源团队期待您的加入](https://www.emqx.io/static/img/github_readme_cn_bg.png)](https://www.emqx.io/cn/careers)
[English](./README.md) | 简体中文
@ -17,22 +16,24 @@
从 3.0 版本开始,*EMQ X* 完整支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,并支持 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP 等通信协议。EMQ X 3.0 单集群可支持千万级别的 MQTT 并发连接。
- 新功能的完整列表,请参阅 [EMQ X Release Notes](https://github.com/emqx/emqx/releases)。
- 获取更多信息,请访问 [EMQ X](https://emqx.io)。
- 获取更多信息,请访问 [EMQ X 官网](https://www.emqx.io/cn/)。
## 安装
*EMQ X* 是跨平台的,支持 Linux、Unix、Mac OS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。
*EMQ X* 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 *EMQ X* 可以部署在 x86_64 架构的服务器上,也可以部署在 Raspberry Pi 这样的 ARM 设备上。
**使用 EMQ X Docker 镜像安装**
#### EMQ X Docker 镜像安装
```
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
```
**或者 [点此下载](https://emqx.io/downloads) 适合你的二进制软件包**
#### 二进制软件包安装
- [单节点安装](https://docs.emqx.io/broker/v3/cn/install.html)
- [集群安装](https://docs.emqx.io/broker/v3/cn/cluster.html)
需从 [EMQ X 下载](https://www.emqx.io/cn/downloads) 页面获取相应操作系统的二进制软件包。
- [单节点安装文档](https://docs.emqx.io/broker/latest/cn/getting-started/install.html)
- [集群配置文档](https://docs.emqx.io/broker/latest/cn/advanced/cluster.html)
## 从源码构建
@ -63,7 +64,7 @@ cd _rel/emqx && ./bin/emqx console
## FAQ
访问 [FAQ](https://docs.emqx.io/tutorial/v3/cn/faq/faq.html) 以获取常见问题的帮助。
访问 [EMQ X FAQ](https://docs.emqx.io/broker/latest/cn/faq/faq.html) 以获取常见问题的帮助。
## 产品路线
@ -73,11 +74,13 @@ cd _rel/emqx && ./bin/emqx console
你可通过以下途径与 EMQ 社区及开发者联系:
- [EMQX Slack](http://emqx.slack.com)
- [Slack](https://slack-invite.emqx.io)
- [Twitter](https://twitter.com/emqtt)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt)
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Weibo](https://weibo.com/emqtt)
- [Blog](https://www.emqx.io/cn/blog)
欢迎你将任何 bug、问题和功能请求提交到 [emqx/emqx](https://github.com/emqx/emqx/issues)。

View File

@ -7,8 +7,7 @@
[![Slack Invite](<https://slack-invite.emqx.io/badge.svg>)](https://slack-invite.emqx.io)
[![Twitter](https://img.shields.io/badge/Twitter-EMQ%20X-1DA1F2?logo=twitter)](https://twitter.com/emqtt)
[![We are hiring](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
[![The best IoT MQTT open source team looks forward to your joining](https://www.emqx.io/static/img/github_readme_en_bg.png)](https://www.emqx.io/careers)
English | [简体中文](./README-CN.md)
@ -17,22 +16,24 @@ English | [简体中文](./README-CN.md)
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
- For full list of new features, please read [EMQ X Release Notes](https://github.com/emqx/emqx/releases).
- For more information, please visit [EMQ X homepage](https://emqx.io).
- For more information, please visit [EMQ X homepage](https://www.emqx.io).
## Installation
The *EMQ X* broker is cross-platform, which supports Linux, Unix, Mac OS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi.
The *EMQ X* broker is cross-platform, which supports Linux, Unix, macOS and Windows. It means *EMQ X* can be deployed on x86_64 architecture servers and ARM devices like Raspberry Pi.
**Installing via EMQ X Docker Image**
#### Installing via EMQ X Docker Image
```
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx
```
**Or download the binary package for your platform from [here](https://emqx.io/downloads).**
#### Installing via Binary Package
- [Single Node Install](https://docs.emqx.io/broker/v3/en/install.html)
- [Multi Node Install](https://docs.emqx.io/broker/v3/en/cluster.html)
Get the binary package of the corresponding OS from [EMQ X Download](https://www.emqx.io/downloads) page.
- [Single Node Install](https://docs.emqx.io/broker/latest/en/getting-started/installation.html)
- [Multi Node Install](https://docs.emqx.io/broker/latest/en/advanced/cluster.html)
## Build From Source
@ -65,7 +66,7 @@ To view the dashboard after running, use your browser to open: http://localhost:
## FAQ
Visiting [FAQ](https://docs.emqx.io/tutorial/v3/en/faq/faq.html) to get help of common problems.
Visiting [EMQ X FAQ](https://docs.emqx.io/broker/latest/en/faq/faq.html) to get help of common problems.
## Roadmap
@ -74,11 +75,12 @@ The [EMQ X Roadmap uses Github milestones](https://github.com/emqx/emqx/mileston
## Community, discussion, contribution, and support
You can reach the EMQ community and developers via the following channels:
- [EMQX Slack](https://slack-invite.emqx.io/)
- [Slack](https://slack-invite.emqx.io/)
- [Twitter](https://twitter.com/emqtt)
- [Facebook](https://www.facebook.com/emqxmqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt)
- [Reddit](https://www.reddit.com/r/emqx/)
Please submit any bugs, issues, and feature requests to [emqx/emqx](https://github.com/emqx/emqx/issues).

View File

@ -414,6 +414,9 @@ log.dir = {{ platform_log_dir }}
## The log filename for logs of level specified in "log.level".
##
## If `log.rotation` is enabled, this is the base name of the
## files. Each file in a rotated log is named <base_name>.N, where N is an integer.
##
## Value: String
## Default: emqx.log
log.file = emqx.log
@ -424,6 +427,14 @@ log.file = emqx.log
## Default: No Limit
#log.chars_limit = 8192
## Enables the log rotation.
## With this enabled, new log files will be created when the current
## log file is full, max to `log.rotation.size` files will be created.
##
## Value: on | off
## Default: on
log.rotation = on
## Maximum size of each log file.
##
## Value: Number
@ -446,9 +457,103 @@ log.rotation.count = 5
## Note: Log files for a specific log level will only contain all the logs
## that higher than or equal to that level
##
#log.info.file = info.log
#log.info.file = info.log
#log.error.file = error.log
## The max allowed queue length before switching to sync mode.
##
## Log overload protection parameter. If the message queue grows
## larger than this value the handler switches from anync to sync mode.
##
## Default: 100
##
#log.sync_mode_qlen = 100
## The max allowed queue length before switching to drop mode.
##
## Log overload protection parameter. When the message queue grows
## larger than this threshold, the handler switches to a mode in which
## it drops all new events that senders want to log.
##
## Default: 3000
##
#log.drop_mode_qlen = 3000
## The max allowed queue length before switching to flush mode.
##
## Log overload protection parameter. If the length of the message queue
## grows larger than this threshold, a flush (delete) operation takes place.
## To flush events, the handler discards the messages in the message queue
## by receiving them in a loop without logging.
##
## Default: 8000
##
#log.flush_qlen = 8000
## Kill the log handler when it gets overloaded.
##
## Log overload protection parameter. It is possible that a handler,
## even if it can successfully manage peaks of high load without crashing,
## can build up a large message queue, or use a large amount of memory.
## We could kill the log handler in these cases and restart it after a
## few seconds.
##
## Default: on
##
#log.overload_kill = on
## The max allowed queue length before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum allowed queue
## length. If the message queue grows larger than this, the handler
## process is terminated.
##
## Default: 20000
##
#log.overload_kill_qlen = 20000
## The max allowed memory size before killing the log hanlder.
##
## Log overload protection parameter. This is the maximum memory size
## that the handler process is allowed to use. If the handler grows
## larger than this, the process is terminated.
##
## Default: 30MB
##
#log.overload_kill_mem_size = 30MB
## Restart the log hanlder after some seconds.
##
## Log overload protection parameter. If the handler is terminated,
## it restarts automatically after a delay specified in seconds.
## The value "infinity" prevents restarts.
##
## Default: 5s
##
#log.overload_kill_restart_after = 5s
## Max burst count and time window for burst control.
##
## Log overload protection parameter. Large bursts of log events - many
## events received by the handler under a short period of time - can
## potentially cause problems. By specifying the maximum number of events
## to be handled within a certain time frame, the handler can avoid
## choking the log with massive amounts of printouts.
##
## This config controls the maximum number of events to handle within
## a time frame. After the limit is reached, successive events are
## dropped until the end of the time frame.
##
## Note that there would be no warning if any messages were
## dropped because of burst control.
##
## Comment this config out to disable the burst control feature.
##
## Value: MaxBurstCount,TimeWindow
## Default: disabled
##
#log.burst_limit = 20000, 1s
##--------------------------------------------------------------------
## Authentication/Access Control
##--------------------------------------------------------------------

View File

@ -477,11 +477,21 @@ end}.
{datatype, integer}
]}.
{mapping, "log.rotation", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.rotation.size", "kernel.logger", [
{default, "10MB"},
{datatype, bytesize}
]}.
{mapping, "log.size", "kernel.logger", [
{default, infinity},
{datatype, [bytesize, atom]}
]}.
{mapping, "log.rotation.count", "kernel.logger", [
{default, 5},
{datatype, integer}
@ -491,6 +501,46 @@ end}.
{datatype, file}
]}.
{mapping, "log.sync_mode_qlen", "kernel.logger", [
{default, 100},
{datatype, integer}
]}.
{mapping, "log.drop_mode_qlen", "kernel.logger", [
{default, 3000},
{datatype, integer}
]}.
{mapping, "log.flush_qlen", "kernel.logger", [
{default, 8000},
{datatype, integer}
]}.
{mapping, "log.overload_kill", "kernel.logger", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.overload_kill_mem_size", "kernel.logger", [
{default, "30MB"},
{datatype, bytesize}
]}.
{mapping, "log.overload_kill_qlen", "kernel.logger", [
{default, 20000},
{datatype, integer}
]}.
{mapping, "log.overload_kill_restart_after", "kernel.logger", [
{default, "5s"},
{datatype, [{duration, ms}, atom]}
]}.
{mapping, "log.burst_limit", "kernel.logger", [
{default, "disabled"},
{datatype, string}
]}.
{mapping, "log.sasl", "sasl.sasl_error_logger", [
{default, off},
{datatype, flag},
@ -521,6 +571,10 @@ end}.
{translation, "kernel.logger", fun(Conf) ->
LogTo = cuttlefish:conf_get("log.to", Conf),
LogLevel = cuttlefish:conf_get("log.level", Conf),
LogType = case cuttlefish:conf_get("log.rotation", Conf) of
true -> wrap;
false -> halt
end,
CharsLimit = case cuttlefish:conf_get("log.chars_limit", Conf) of
-1 -> unlimited;
V -> V
@ -537,11 +591,37 @@ end}.
[]}]},
msg,"\n"],
chars_limit => CharsLimit}},
{BustLimitOn, {MaxBurstCount, TimeWindow}} =
case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of
["disabled"] -> {false, {20000, 1000}};
[Count, Window] ->
{true, {list_to_integer(Count),
case cuttlefish_duration:parse(Window, ms) of
Secs when is_integer(Secs) -> Secs;
{error, Reason1} -> error(Reason1)
end}}
end,
FileConf = fun(Filename) ->
#{type => wrap,
BasicConf =
#{type => LogType,
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)}
sync_mode_qlen => cuttlefish:conf_get("log.sync_mode_qlen", Conf),
drop_mode_qlen => cuttlefish:conf_get("log.drop_mode_qlen", Conf),
flush_qlen => cuttlefish:conf_get("log.flush_qlen", Conf),
overload_kill_enable => cuttlefish:conf_get("log.overload_kill", Conf),
overload_kill_qlen => cuttlefish:conf_get("log.overload_kill_qlen", Conf),
overload_kill_mem_size => cuttlefish:conf_get("log.overload_kill_mem_size", Conf),
overload_kill_restart_after => cuttlefish:conf_get("log.overload_kill_restart_after", Conf),
burst_limit_enable => BustLimitOn,
burst_limit_max_count => MaxBurstCount,
burst_limit_window_time => TimeWindow
},
MaxNoBytes = case LogType of
wrap -> cuttlefish:conf_get("log.rotation.size", Conf);
halt -> cuttlefish:conf_get("log.size", Conf)
end,
BasicConf#{max_no_bytes => MaxNoBytes}
end,
%% For the default logger that outputs to console

View File

@ -4,7 +4,7 @@
[{gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}},
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.4"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.7.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.1"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.6.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.7.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.4.1"}}},
{cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.0.0"}}}

View File

@ -538,6 +538,21 @@ do_subscribe(TopicFilter, SubOpts = #{qos := QoS}, Channel =
{error, RC} -> {RC, Channel}
end.
-compile({inline, [process_force_subscribe/2]}).
process_force_subscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, SubOpts = #{qos := QoS}}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
NSubOpts = enrich_subopts(maps:merge(?DEFAULT_SUBOPTS, SubOpts), ChannelAcc),
case emqx_session:subscribe(ClientInfo, NTopicFilter, NSubOpts, Session) of
{ok, NSession} ->
{ReasonCodes ++ [QoS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%--------------------------------------------------------------------
%% Process Unsubscribe
%%--------------------------------------------------------------------
@ -563,6 +578,20 @@ do_unsubscribe(TopicFilter, _SubOpts, Channel =
{error, RC} -> {RC, Channel}
end.
-compile({inline, [process_force_unsubscribe/2]}).
process_force_unsubscribe(Subscriptions, Channel =
#channel{clientinfo = ClientInfo = #{mountpoint := MountPoint},
session = Session}) ->
lists:foldl(fun({TopicFilter, _SubOpts}, {ReasonCodes, ChannelAcc}) ->
NTopicFilter = emqx_mountpoint:mount(MountPoint, TopicFilter),
case emqx_session:unsubscribe(ClientInfo, NTopicFilter, Session) of
{ok, NSession} ->
{ReasonCodes ++ [?RC_SUCCESS], ChannelAcc#channel{session = NSession}};
{error, ReasonCode} ->
{ReasonCodes ++ [ReasonCode], ChannelAcc}
end
end, {[], Channel}, Subscriptions).
%%--------------------------------------------------------------------
%% Process Disconnect
%%--------------------------------------------------------------------
@ -818,6 +847,10 @@ handle_info({subscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInf
{_ReasonCodes, NChannel} = process_subscribe(TopicFilters1, Channel),
{ok, NChannel};
handle_info({force_subscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_subscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientInfo}) ->
TopicFilters1 = run_hooks('client.unsubscribe',
[ClientInfo, #{'Internal' => true}],
@ -826,6 +859,10 @@ handle_info({unsubscribe, TopicFilters}, Channel = #channel{clientinfo = ClientI
{_ReasonCodes, NChannel} = process_unsubscribe(TopicFilters1, Channel),
{ok, NChannel};
handle_info({force_unsubscribe, TopicFilters}, Channel) ->
{_ReasonCodes, NChannel} = process_force_unsubscribe(parse_topic_filters(TopicFilters), Channel),
{ok, NChannel};
handle_info({sock_closed, Reason}, Channel = #channel{conn_state = idle}) ->
shutdown(Reason, Channel);

View File

@ -27,9 +27,7 @@
-export([start_link/0]).
-export([ register_channel/1
, register_channel/2
, register_channel/3
-export([ register_channel/3
, unregister_channel/1
]).
@ -45,6 +43,8 @@
, set_chan_stats/2
]).
-export([get_chann_conn_mod/2]).
-export([ open_session/3
, discard_session/1
, discard_session/2
@ -100,28 +100,29 @@ start_link() ->
%% API
%%--------------------------------------------------------------------
%% @doc Register a channel.
-spec(register_channel(emqx_types:clientid()) -> ok).
register_channel(ClientId) ->
register_channel(ClientId, self()).
%% @doc Register a channel with pid.
-spec(register_channel(emqx_types:clientid(), chan_pid()) -> ok).
register_channel(ClientId, ChanPid) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, Chan),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Register a channel with info and stats.
-spec(register_channel(emqx_types:clientid(),
emqx_types:infos(),
emqx_types:stats()) -> ok).
register_channel(ClientId, Info, Stats) ->
register_channel(ClientId, Info = #{conninfo := ConnInfo}, Stats) ->
Chan = {ClientId, ChanPid = self()},
true = ets:insert(?CHAN_INFO_TAB, {Chan, Info, Stats}),
register_channel(ClientId, ChanPid).
register_channel(ClientId, ChanPid, ConnInfo);
%% @private
%% @doc Register a channel with pid and conn_mod.
%%
%% There is a Race-Condition on one node or cluster when many connections
%% login to Broker with the same clientid. We should register it and save
%% the conn_mod first for taking up the clientid access right.
%%
%% Note that: It should be called on a lock transaction
register_channel(ClientId, ChanPid, #{conn_mod := ConnMod}) when is_pid(ChanPid) ->
Chan = {ClientId, ChanPid},
true = ets:insert(?CHAN_TAB, Chan),
true = ets:insert(?CHAN_CONN_TAB, {Chan, ConnMod}),
ok = emqx_cm_registry:register_channel(Chan),
cast({registered, Chan}).
%% @doc Unregister a channel.
-spec(unregister_channel(emqx_types:clientid()) -> ok).
@ -132,7 +133,7 @@ unregister_channel(ClientId) when is_binary(ClientId) ->
%% @private
do_unregister_channel(Chan) ->
ok = emqx_cm_registry:unregister_channel(Chan),
true = ets:delete_object(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_CONN_TAB, Chan),
true = ets:delete(?CHAN_INFO_TAB, Chan),
ets:delete_object(?CHAN_TAB, Chan).
@ -206,24 +207,29 @@ set_chan_stats(ClientId, ChanPid, Stats) ->
pendings => list()}}
| {error, Reason :: term()}).
open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
CleanStart = fun(_) ->
ok = discard_session(ClientId),
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
emqx_cm_locker:trans(ClientId, CleanStart);
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) ->
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end
end,
@ -253,8 +259,8 @@ takeover_session(ClientId) ->
end.
takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
case get_chann_conn_mod(ClientId, ChanPid) of
ConnMod when is_atom(ConnMod) ->
Session = ConnMod:call(ChanPid, {takeover, 'begin'}),
{ok, ConnMod, ChanPid, Session};
undefined ->
@ -284,8 +290,8 @@ discard_session(ClientId) when is_binary(ClientId) ->
end.
discard_session(ClientId, ChanPid) when node(ChanPid) == node() ->
case get_chan_info(ClientId, ChanPid) of
#{conninfo := #{conn_mod := ConnMod}} ->
case get_chann_conn_mod(ClientId, ChanPid) of
ConnMod when is_atom(ConnMod) ->
ConnMod:call(ChanPid, discard);
undefined -> ok
end;
@ -418,3 +424,12 @@ update_stats({Tab, Stat, MaxStat}) ->
Size -> emqx_stats:setstat(Stat, MaxStat, Size)
end.
get_chann_conn_mod(ClientId, ChanPid) when node(ChanPid) == node() ->
Chan = {ClientId, ChanPid},
try [ConnMod] = ets:lookup_element(?CHAN_CONN_TAB, Chan, 2), ConnMod
catch
error:badarg -> undefined
end;
get_chann_conn_mod(ClientId, ChanPid) ->
rpc_call(node(ChanPid), get_chann_conn_mod, [ClientId, ChanPid]).

View File

@ -30,6 +30,8 @@
-export([ new/1
, new/2
, ensure/1
, ensure/2
, all/0
]).
@ -204,6 +206,17 @@ new(gauge, Name) ->
new(counter, Name) ->
create(counter, Name).
-spec(ensure(metric_name()) -> ok).
ensure(Name) ->
ensure(counter, Name).
-spec(ensure(gauge|counter, metric_name()) -> ok).
ensure(Type, Name) when Type =:= gauge; Type =:= counter ->
case ets:lookup(?TAB, Name) of
[] -> create(Type, Name);
_ -> ok
end.
%% @private
create(Type, Name) ->
case gen_server:call(?SERVER, {create, Type, Name}) of

View File

@ -39,7 +39,7 @@ list() ->
-spec(load() -> ok).
load() ->
case emqx:get_env(modules_loaded_file) of
undefined -> ignore;
undefined -> ok;
File ->
load_modules(File)
end.

View File

@ -50,7 +50,7 @@ name(16#11) -> no_subscription_existed;
name(16#18) -> continue_authentication;
name(16#19) -> re_authenticate;
name(16#80) -> unspecified_error;
name(16#81) -> malformed_Packet;
name(16#81) -> malformed_packet;
name(16#82) -> protocol_error;
name(16#83) -> implementation_specific_error;
name(16#84) -> unsupported_protocol_version;

View File

@ -91,7 +91,9 @@
-type(ver() :: ?MQTT_PROTO_V3
| ?MQTT_PROTO_V4
| ?MQTT_PROTO_V5).
| ?MQTT_PROTO_V5
| non_neg_integer()).
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
-type(qos_name() :: qos0 | at_most_once |
qos1 | at_least_once |
@ -107,7 +109,7 @@
-type(conninfo() :: #{socktype := socktype(),
sockname := peername(),
peername := peername(),
peercert := esockd_peercert:peercert(),
peercert := nossl | undefined | esockd_peercert:peercert(),
conn_mod := module(),
proto_name := binary(),
proto_ver := ver(),
@ -116,7 +118,7 @@
username := username(),
conn_props := properties(),
connected := boolean(),
connected_at := erlang:timestamp(),
connected_at := non_neg_integer(),
keepalive := 0..16#FFFF,
receive_maximum := non_neg_integer(),
expiry_interval := non_neg_integer(),

View File

@ -23,6 +23,13 @@
-include_lib("eunit/include/eunit.hrl").
-define(CM, emqx_cm).
-define(ChanInfo,#{conninfo =>
#{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100}}).
%%--------------------------------------------------------------------
%% CT callbacks
@ -43,13 +50,13 @@ end_per_suite(_Config) ->
%%--------------------------------------------------------------------
t_reg_unreg_channel(_) ->
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
?assertEqual([self()], emqx_cm:lookup_channels(<<"clientid">>)),
ok = emqx_cm:unregister_channel(<<"clientid">>),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_get_set_chan_info(_) ->
Info = #{proto_ver => 4, proto_name => <<"MQTT">>},
Info = ?ChanInfo,
ok = emqx_cm:register_channel(<<"clientid">>, Info, []),
?assertEqual(Info, emqx_cm:get_chan_info(<<"clientid">>)),
Info1 = Info#{proto_ver => 5},
@ -60,7 +67,7 @@ t_get_set_chan_info(_) ->
t_get_set_chan_stats(_) ->
Stats = [{recv_oct, 10}, {send_oct, 8}],
ok = emqx_cm:register_channel(<<"clientid">>, #{}, Stats),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, Stats),
?assertEqual(Stats, emqx_cm:get_chan_stats(<<"clientid">>)),
Stats1 = [{recv_oct, 10}|Stats],
true = emqx_cm:set_chan_stats(<<"clientid">>, Stats1),
@ -69,27 +76,89 @@ t_get_set_chan_stats(_) ->
?assertEqual(undefined, emqx_cm:get_chan_stats(<<"clientid">>)).
t_open_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127,0,0,1}},
ConnInfo = #{peername => {{127,0,0,1}, 5000},
ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100},
{ok, #{session := Session1, present := false}}
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session1)),
{ok, #{session := Session2, present := false}}
= emqx_cm:open_session(false, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)).
= emqx_cm:open_session(true, ClientInfo, ConnInfo),
?assertEqual(100, emqx_session:info(inflight_max, Session2)),
emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
t_open_session_race_condition(_) ->
ClientInfo = #{zone => external,
clientid => <<"clientid">>,
username => <<"username">>,
peerhost => {127,0,0,1}},
ConnInfo = #{socktype => tcp,
peername => {{127,0,0,1}, 5000},
sockname => {{127,0,0,1}, 1883},
peercert => nossl,
conn_mod => emqx_connection,
receive_maximum => 100},
Parent = self(),
OpenASession = fun() ->
timer:sleep(rand:uniform(100)),
OpenR = (emqx_cm:open_session(true, ClientInfo, ConnInfo)),
Parent ! OpenR,
case OpenR of
{ok, _} ->
receive
{'$gen_call', From, discard} ->
gen_server:reply(From, ok), ok
end;
{error, Reason} ->
exit(Reason)
end
end,
[spawn(
fun() ->
spawn(OpenASession),
spawn(OpenASession)
end) || _ <- lists:seq(1, 1000)],
WaitingRecv = fun _Wr(N1, N2, 0) ->
{N1, N2};
_Wr(N1, N2, Rest) ->
receive
{ok, _} -> _Wr(N1+1, N2, Rest-1);
{error, _} -> _Wr(N1, N2+1, Rest-1)
end
end,
ct:pal("Race condition status: ~p~n", [WaitingRecv(0, 0, 2000)]),
?assertEqual(1, ets:info(emqx_channel, size)),
?assertEqual(1, ets:info(emqx_channel_conn, size)),
?assertEqual(1, ets:info(emqx_channel_registry, size)),
[Pid] = emqx_cm:lookup_channels(<<"clientid">>),
exit(Pid, kill), timer:sleep(100),
?assertEqual([], emqx_cm:lookup_channels(<<"clientid">>)).
t_discard_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> ok end),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
ok = emqx_cm:discard_session(<<"clientid">>),
ok = meck:expect(emqx_connection, call, fun(_, _) -> error(testing) end),
ok = emqx_cm:discard_session(<<"clientid">>),
@ -97,35 +166,26 @@ t_discard_session(_) ->
ok = meck:unload(emqx_connection).
t_takeover_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
{error, not_found} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
Pid = self(),
{ok, emqx_connection, Pid, test} = emqx_cm:takeover_session(<<"clientid">>),
erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
timer:sleep(1000)
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
receive
{'$gen_call', From, {takeover, 'begin'}} ->
gen_server:reply(From, test), ok
end
end),
ct:sleep(100),
timer:sleep(100),
{ok, emqx_connection, _, test} = emqx_cm:takeover_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = meck:unload(emqx_connection).
emqx_cm:unregister_channel(<<"clientid">>).
t_kick_session(_) ->
ok = meck:new(emqx_connection, [passthrough, no_history]),
ok = meck:expect(emqx_connection, call, fun(_, _) -> test end),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>),
{error, not_found} = emqx_cm:kick_session(<<"clientid">>),
ok = emqx_cm:unregister_channel(<<"clientid">>),
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
test = emqx_cm:kick_session(<<"clientid">>),
erlang:spawn(fun() ->
ok = emqx_cm:register_channel(<<"clientid">>, #{conninfo => #{conn_mod => emqx_connection}}, []),
ok = emqx_cm:register_channel(<<"clientid">>, ?ChanInfo, []),
timer:sleep(1000)
end),
ct:sleep(100),

View File

@ -42,6 +42,24 @@ t_new(_) ->
1 = emqx_metrics:val('metrics.test.total')
end).
t_ensure(_) ->
with_metrics_server(
fun() ->
ok = emqx_metrics:ensure('metrics.test'),
ok = emqx_metrics:ensure('metrics.test'),
0 = emqx_metrics:val('metrics.test'),
ok = emqx_metrics:inc('metrics.test'),
1 = emqx_metrics:val('metrics.test'),
ok = emqx_metrics:ensure(counter, 'metrics.test.cnt'),
0 = emqx_metrics:val('metrics.test.cnt'),
ok = emqx_metrics:inc('metrics.test.cnt'),
1 = emqx_metrics:val('metrics.test.cnt'),
ok = emqx_metrics:ensure(gauge, 'metrics.test.total'),
0 = emqx_metrics:val('metrics.test.total'),
ok = emqx_metrics:inc('metrics.test.total'),
1 = emqx_metrics:val('metrics.test.total')
end).
t_all(_) ->
with_metrics_server(
fun() ->