Merge branch 'emqx30' into emqx30-feng

This commit is contained in:
Feng Lee 2018-08-30 18:42:29 +08:00 committed by GitHub
commit c49e5dfddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 267 additions and 174 deletions

View File

@ -35,10 +35,10 @@ EUNIT_OPTS = verbose
# CT_SUITES = emqx_stats
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \
CT_SUITES = emqx emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone emqx_mountpoint
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

130
README.md
View File

@ -1,65 +1,29 @@
# *EMQ X* - MQTT Broker
# *EMQ X* - EMQ X Broker
[![Build Status](https://travis-ci.org/emqtt/emqttd.svg?branch=master)](https://travis-ci.org/emqtt/emqttd)
*EMQ X* broker is fully a open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients.
*EMQ* (Erlang MQTT Broker) is a distributed, massively scalable, highly extensible MQTT message broker written in Erlang/OTP.
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
*EMQ* is fully open source and licensed under the Apache Version 2.0. *EMQ* implements both MQTT V3.1 and V3.1.1 protocol specifications, and supports MQTT-SN, CoAP, WebSocket, STOMP and SockJS at the same time.
*EMQ* provides a scalable, reliable, enterprise-grade MQTT message Hub for IoT, M2M, Smart Hardware and Mobile Messaging Applications.
- For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/).
- For more information, please visit [EMQ X homepage](http://emqtt.io).
The 1.0 release of the EMQ broker has scaled to 1.3 million concurrent MQTT connections on a 12 Core, 32G CentOS server.
Please visit [emqtt.io](http://emqtt.io) for more service. Follow us on Twitter: [@emqtt](https://twitter.com/emqtt)
## Features
* Full MQTT V3.1/V3.1.1 support
* QoS0, QoS1, QoS2 Publish/Subscribe
* Session Management and Offline Messages
* Retained Message
* Last Will Message
* TCP/SSL Connection
* MQTT Over WebSocket(SSL)
* HTTP Publish API
* MQTT-SN Protocol
* STOMP protocol
* STOMP over SockJS
* $SYS/# Topics
* ClientID Authentication
* IpAddress Authentication
* Username and Password Authentication
* Access control based on IpAddress, ClientID, Username
* JWT Authentication
* LDAP Authentication/ACL
* HTTP Authentication/ACL
* MySQL Authentication/ACL
* Redis Authentication/ACL
* PostgreSQL Authentication/ACL
* MongoDB Authentication/ACL
* Cluster brokers on several nodes
* Bridge brokers locally or remotely
* mosquitto, RSMB bridge
* Extensible architecture with Hooks and Plugins
* Passed eclipse paho interoperability tests
* Local Subscription
* Shared Subscription
* Proxy Protocol V1/2
* Lua Hook and Web Hook
* LWM2M Prototol Support
## Installation
The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi.
Download the binary package for your platform from http://emqtt.io/downloads.
Download the binary package for your platform from [here](http://emqtt.io/downloads).
-[Single Node Install](http://emqtt.io/docs/v2/install.html)
-[Multi Node Install](http://emqtt.io/docs/v2/cluster.html)
Documentation on [emqtt.io/docs/v2/](http://emqtt.io/docs/v2/install.html), [docs.emqtt.com](http://docs.emqtt.com/en/latest/install.html) for installation and configuration guide.
## Build From Source
The *EMQ* broker requires Erlang/OTP R19+ to build since 2.1 release.
The *EMQ* broker requires Erlang/OTP R21+ to build since 3.0 release.
```
git clone https://github.com/emqtt/emq-relx.git
@ -67,55 +31,51 @@ git clone https://github.com/emqtt/emq-relx.git
cd emq-relx && make
cd _rel/emqttd && ./bin/emqttd console
```
## Plugins
## Quick Start
The *EMQ* broker is highly extensible, with many hooks and plugins for customizing the authentication/ACL and integrating with other systems:
# Start emqttd
./bin/emqttd start
# Check Status
./bin/emqttd_ctl status
# Stop emqttd
./bin/emqttd stop
Plugin | Description
-----------------------------------------------------------------------|--------------------------------------
[emq_plugin_template](https://github.com/emqtt/emq_plugin_template) | Plugin template and demo
[emq_dashboard](https://github.com/emqtt/emq_dashboard) | Web Dashboard
[emq_retainer](https://github.com/emqtt/emq-retainer) | Store MQTT Retained Messages
[emq_modules](https://github.com/emqtt/emq-modules) | Presence, Subscription and Rewrite Modules
[emq_auth_username](https://github.com/emqtt/emq_auth_username) | Username/Password Authentication Plugin
[emq_auth_clientid](https://github.com/emqtt/emq_auth_clientid) | ClientId Authentication Plugin
[emq_auth_mysql](https://github.com/emqtt/emq_auth_mysql) | MySQL Authentication/ACL Plugin
[emq_auth_pgsql](https://github.com/emqtt/emq_auth_pgsql) | PostgreSQL Authentication/ACL Plugin
[emq_auth_redis](https://github.com/emqtt/emq_auth_redis) | Redis Authentication/ACL Plugin
[emq_auth_mongo](https://github.com/emqtt/emq_auth_mongo) | MongoDB Authentication/ACL Plugin
[emq_auth_http](https://github.com/emqtt/emq_auth_http) | Authentication/ACL by HTTP API
[emq_auth_ldap](https://github.com/emqtt/emq_auth_ldap) | LDAP Authentication Plugin
[emq_auth_jwt](https://github.com/emqtt/emq-auth-jwt) | JWT Authentication Plugin
[emq_web_hook](https://github.com/emqtt/emq-web-hook) | Web Hook Plugin
[emq_lua_hook](https://github.com/emqtt/emq-lua-hook) | Lua Hook Plugin
[emq_sn](https://github.com/emqtt/emq_sn) | MQTT-SN Protocol Plugin
[emq_coap](https://github.com/emqtt/emq_coap) | CoAP Protocol Plugin
[emq_stomp](https://github.com/emqtt/emq_stomp) | Stomp Protocol Plugin
[emq_lwm2m](https://github.com/emqx/emqx-lwm2m) | LWM2M Prototol Plugin
[emq_recon](https://github.com/emqtt/emq_recon) | Recon Plugin
[emq_reloader](https://github.com/emqtt/emq_reloader) | Reloader Plugin
[emq_sockjs](https://github.com/emqtt/emq_sockjs) | SockJS(Stomp) Plugin
To view the dashboard after running, use your browser to open: http://localhost:18083
## Supports
* Twitter: [@emqtt](https://twitter.com/emqtt)
* Homepage: http://emqtt.io
* Downloads: http://emqtt.io/downloads
* Documentation: http://emqtt.io/docs/v2/
* Forum: https://groups.google.com/d/forum/emqtt
* Mailing List: <emqtt@googlegroups.com>
* Issues: https://github.com/emqtt/emqttd/issues
* QQ Group: 12222225
## Roadmap
## Test Servers
The [EMQX roadmap uses Github milestones](https://github.com/emqtt/emqttd/milestones) to track the progress of the project.
The **q.emqtt.com** hosts a public Four-Node *EMQ* cluster on [QingCloud](https://qingcloud.com):
## Community, discussion, contribution, and support
You can reach the EMQ community and developers via the following channels:
- [EMQX Slack](http://emqx.slack.com)
-[#emqx-users](https://emqx.slack.com/messages/CBUF2TTB8/)
-[#emqx-devs](https://emqx.slack.com/messages/CBSL57DUH/)
- [Mailing Lists](<emqtt@googlegroups.com>)
- [Twitter](https://twitter.com/emqtt)
- [Forum](https://groups.google.com/d/forum/emqtt)
- [Blog](https://medium.com/@emqtt)
Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github.com/emqtt/emqttd/issues).
![qing_cluster](http://emqtt.io/static/img/public_cluster.png)
## License
Copyright (c) 2014-2018 [EMQ X Tech, LLC](http://emqtt.io)
Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at
[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License.
Apache License Version 2.0

View File

@ -68,74 +68,75 @@
%% Clean Start Flag
clean_start = false :: boolean(),
%% Client Binding: local | remote
binding = local :: local | remote,
%% ClientId: Identifier of Session
client_id :: binary(),
%% Client Binding: local | remote
binding = local :: local | remote,
%% Username
username :: binary() | undefined,
%% ClientId: Identifier of Session
client_id :: binary(),
%% Connection pid binding with session
conn_pid :: pid(),
%% Username
username :: binary() | undefined,
%% Old Connection Pid that has been kickout
old_conn_pid :: pid(),
%% Connection pid binding with session
conn_pid :: pid(),
%% Next packet id of the session
next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
%% Old Connection Pid that has been kickout
old_conn_pid :: pid(),
%% Max subscriptions
max_subscriptions :: non_neg_integer(),
%% Next packet id of the session
next_pkt_id = 1 :: emqx_mqtt_types:packet_id(),
%% Clients Subscriptions.
subscriptions :: map(),
%% Max subscriptions
max_subscriptions :: non_neg_integer(),
%% Upgrade QoS?
upgrade_qos = false :: boolean(),
%% Clients Subscriptions.
subscriptions :: map(),
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
inflight :: emqx_inflight:inflight(),
%% Upgrade QoS?
upgrade_qos = false :: boolean(),
%% Max Inflight Size. DEPRECATED: Get from inflight
%% max_inflight = 32 :: non_neg_integer(),
%% Client <- Broker: Inflight QoS1, QoS2 messages sent to the client but unacked.
inflight :: emqx_inflight:inflight(),
%% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: timeout(),
%% Max Inflight Size. DEPRECATED: Get from inflight
%% max_inflight = 32 :: non_neg_integer(),
%% Retry Timer
retry_timer :: reference() | undefined,
%% Retry interval for redelivering QoS1/2 messages
retry_interval = 20000 :: timeout(),
%% All QoS1, QoS2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client.
%%
%% Optionally, QoS 0 messages pending transmission to the Client.
mqueue :: emqx_mqueue:mqueue(),
%% Retry Timer
retry_timer :: reference() | undefined,
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(),
%% All QoS1, QoS2 messages published to when client is disconnected.
%% QoS 1 and QoS 2 messages pending transmission to the Client.
%%
%% Optionally, QoS 0 messages pending transmission to the Client.
mqueue :: emqx_mqueue:mqueue(),
%% Max Packets Awaiting PUBREL
max_awaiting_rel = 100 :: non_neg_integer(),
%% Client -> Broker: Inflight QoS2 messages received from client and waiting for pubrel.
awaiting_rel :: map(),
%% Awaiting PUBREL Timeout
await_rel_timeout = 20000 :: timeout(),
%% Max Packets Awaiting PUBREL
max_awaiting_rel = 100 :: non_neg_integer(),
%% Awaiting PUBREL Timer
await_rel_timer :: reference() | undefined,
%% Awaiting PUBREL Timeout
await_rel_timeout = 20000 :: timeout(),
%% Session Expiry Interval
expiry_interval = 7200000 :: timeout(),
%% Awaiting PUBREL Timer
await_rel_timer :: reference() | undefined,
%% Expired Timer
expiry_timer :: reference() | undefined,
%% Session Expiry Interval
expiry_interval = 7200000 :: timeout(),
%% Enable Stats
enable_stats :: boolean(),
%% Expired Timer
expiry_timer :: reference() | undefined,
%% Stats timer
stats_timer :: reference() | undefined,
%% Enable Stats
enable_stats :: boolean(),
%% Stats timer
stats_timer :: reference() | undefined,
%% Deliver stats
deliver_stats = 0,
@ -143,9 +144,10 @@
%% Enqueue stats
enqueue_stats = 0,
%% Created at
created_at :: erlang:timestamp()
}).
%% Created at
created_at :: erlang:timestamp()
}).
-define(TIMEOUT, 60000).
@ -286,7 +288,12 @@ pubcomp(SPid, PacketId, ReasonCode) ->
-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
unsubscribe(SPid, undefined, #{}, lists:map(fun emqx_topic:parse/1, RawTopicFilters)).
TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts);
(RawTopic) ->
emqx_topic:parse(RawTopic)
end, RawTopicFilters),
unsubscribe(SPid, undefined, #{}, TopicFilters).
-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
@ -431,20 +438,20 @@ handle_call(Req, _From, State) ->
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
suback(FromPid, PacketId, ReasonCodes),
noreply(State#state{subscriptions = Subscriptions1});
@ -452,16 +459,16 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic, ClientId),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
end
end, {[], Subscriptions}, TopicFilters),
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic, ClientId),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
end
end, {[], Subscriptions}, TopicFilters),
unsuback(From, PacketId, ReasonCodes),
noreply(State#state{subscriptions = Subscriptions1});
@ -703,7 +710,7 @@ sortfun(inflight) ->
sortfun(awaiting_rel) ->
fun({_, #message{timestamp = Ts1}},
{_, #message{timestamp = Ts2}}) ->
Ts1 < Ts2
Ts1 < Ts2
end.
%%------------------------------------------------------------------------------
@ -745,7 +752,7 @@ dispatch(Msg = #message{qos = ?QOS0}, State) ->
inc_stats(deliver, State);
dispatch(Msg = #message{qos = QoS}, State = #state{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
case emqx_inflight:is_full(Inflight) of
true -> enqueue_msg(Msg, State);
false ->
@ -836,6 +843,7 @@ dequeue2(State = #state{mqueue = Q}) ->
ensure_await_rel_timer(State = #state{await_rel_timer = undefined, await_rel_timeout = Timeout}) ->
ensure_await_rel_timer(Timeout, State);
ensure_await_rel_timer(State) ->
State.

View File

@ -17,7 +17,7 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([info/1]).
-export([info/1, attrs/1]).
-export([stats/1]).
-export([kick/1]).
-export([session/1]).
@ -53,9 +53,14 @@
%% API
%%------------------------------------------------------------------------------
%% for debug
info(WSPid) ->
call(WSPid, info).
%% for dashboard
attrs(CPid) when is_pid(CPid) ->
call(CPid, attrs).
stats(WSPid) ->
call(WSPid, stats).
@ -170,6 +175,15 @@ websocket_info({call, From, info}, State = #state{peername = Peername,
gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])),
{ok, State};
websocket_info({call, From, attrs}, State = #state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))),
{ok, State};
websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) ->
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]),
gen_server:reply(From, Stats),
@ -262,4 +276,3 @@ shutdown(Reason, State) ->
wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS].

View File

@ -330,15 +330,16 @@ serialize_parse_subscribe(_) ->
%% SUBSCRIBE(Q1, R0, D0, PacketId=2, TopicTable=[{<<"TopicA">>,2}])
Bin = <<130,11,0,2,0,6,84,111,112,105,99,65,2>>,
TopicOpts = #{ nl => 0 , rap => 0, rc => 0,
rh => 0, subid => 0 , qos => 2 },
rh => 0, qos => 2 },
TopicFilters = [{<<"TopicA">>, TopicOpts}],
Packet = ?SUBSCRIBE_PACKET(2, TopicFilters),
?assertEqual(Bin, iolist_to_binary(serialize(Packet))),
ct:log("Bin: ~p, Packet: ~p ~n", [Packet, parse(Bin)]),
?assertEqual({ok, Packet, <<>>}, parse(Bin)).
serialize_parse_subscribe_v5(_) ->
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}},
{<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0, subid => 0}}],
TopicFilters = [{<<"TopicQos0">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}},
{<<"TopicQos1">>, #{rh => 1, qos => ?QOS_2, rap => 0, nl => 0, rc => 0}}],
Packet = ?SUBSCRIBE_PACKET(3, #{'Subscription-Identifier' => 16#FFFFFFF},
TopicFilters),
?assertEqual({ok, Packet, <<>>},

View File

@ -16,13 +16,15 @@
-behaviour(gen_server).
-export([start_link/1, open_session/3, close_session/2, stop/1]).
-export([start_link/1, open_session/3, close_session/2, stop/1, get_last_message/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-record(state, {clean_start, client_id, client_pid}).
-define(TAB, messages).
start_link(ClientId) ->
gen_server:start_link(?MODULE, [ClientId], []).
@ -35,13 +37,25 @@ close_session(ClientPid, SessPid) ->
stop(CPid) ->
gen_server:call(CPid, stop).
get_last_message() ->
[{last_message, Msg}] = ets:lookup(?TAB, last_message),
Msg.
init([ClientId]) ->
{ok, #state{clean_start = true, client_id = ClientId}}.
Result = lists:member(?TAB, ets:all()),
if Result == false ->
ets:new(?TAB, [set, named_table]);
true -> ok
end,
{ok,
#state{clean_start = true,
client_id = ClientId}
}.
handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
Attrs = #{ zone => Zone,
client_id => ClientId,
conn_pid => ClientPid,
conn_pid => ClientPid,
clean_start => true,
username => undefined,
conn_props => undefined
@ -49,7 +63,7 @@ handle_call({start_session, ClientPid, ClientId, Zone}, _From, State) ->
{ok, SessPid} = emqx_sm:open_session(Attrs),
{reply, {ok, SessPid}, State#state{
clean_start = true,
client_id = ClientId,
client_id = ClientId,
client_pid = ClientPid
}};
@ -66,6 +80,9 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info({_, Msg}, State) ->
ets:insert(?TAB, {last_message, Msg}),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.

View File

@ -0,0 +1,36 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_mountpoint_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [t_mount_unmount, t_replvar].
t_mount_unmount(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg2 = emqx_mountpoint:mount(<<"mount">>, Msg),
?assertEqual(<<"mounttopic">>, Msg2#message.topic),
TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS2}}],
TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS2}}]),
Msg = emqx_mountpoint:unmount(<<"mount">>, Msg2).
t_replvar(_) ->
<<"mount/test/clientid">> = emqx_mountpoint:replvar(<<"mount/%u/%c">>, #{client_id => <<"clientid">>, username => <<"test">>}).

View File

@ -39,6 +39,7 @@ end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
init_per_testcase(_TestCase, Config) ->
clear_tables(),
Config.
end_per_testcase(_TestCase, _Config) ->

View File

@ -0,0 +1,57 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_session_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("common_test/include/ct.hrl").
all() -> [t_session_all].
t_session_all(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),
Message1 = emqx_message:make(<<"ClientId">>, 2, <<"topic">>, <<"hello">>),
emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 2}}]),
emqx_session:subscribe(SPid, [{<<"topic">>, #{qos => 1}}]),
timer:sleep(200),
[{<<"topic">>, _}] = emqx:subscriptions({SPid, <<"ClientId">>}),
emqx_session:publish(SPid, 1, Message1),
timer:sleep(200),
{publish, 1, _} = emqx_mock_client:get_last_message(),
emqx_session:puback(SPid, 2),
emqx_session:puback(SPid, 3, reasoncode),
emqx_session:pubrec(SPid, 4),
emqx_session:pubrec(SPid, 5, reasoncode),
emqx_session:pubrel(SPid, 6, reasoncode),
emqx_session:pubcomp(SPid, 7, reasoncode),
timer:sleep(200),
2 = emqx_metrics:val('packets/puback/missed'),
2 = emqx_metrics:val('packets/pubrec/missed'),
1 = emqx_metrics:val('packets/pubrel/missed'),
1 = emqx_metrics:val('packets/pubcomp/missed'),
Attrs = emqx_session:attrs(SPid),
Info = emqx_session:info(SPid),
Stats = emqx_session:stats(SPid),
ClientId = proplists:get_value(client_id, Attrs),
ClientId = proplists:get_value(client_id, Info),
1 = proplists:get_value(subscriptions_count, Stats),
emqx_session:unsubscribe(SPid, [<<"topic">>]),
timer:sleep(200),
[] = emqx:subscriptions({SPid, <<"clientId">>}),
emqx_mock_client:close_session(ConnPid, SPid).

View File

@ -26,7 +26,7 @@ t_open_close_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
Attrs = #{clean_start => true, client_id => <<"client">>, conn_pid => ClientPid,
zone => internal, username => <<"zhou">>, conn_props => #{}},
{ok, _SPid} = emqx_sm:open_session(Attrs),
{ok, SPid} = emqx_sm:open_session(Attrs),
[{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
SPid = emqx_sm:lookup_session_pid(<<"client">>),
{ok, NewConnPid} = emqx_mock_client:start_link(<<"client">>),