Merge pull request #4 from emqtt/emqx30

emqx30
This commit is contained in:
tigercl 2018-09-04 09:05:21 +08:00 committed by GitHub
commit fe91484806
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 813 additions and 537 deletions

27
.editorconfig Normal file
View File

@ -0,0 +1,27 @@
# EditorConfig is awesome: https://EditorConfig.org
# top-most EditorConfig file
root = true
# Unix-style newlines with a newline ending every file
[*]
charset = utf-8
end_of_line = lf
trim_trailing_whitespace = true
insert_final_newline = true
# Matches multiple files with brace expansion notation
# Set default charset
[*.{erl, src, hrl}]
indent_style = space
indent_size = 4
# Tab indentation (no size specified)
[Makefile]
indent_style = tab
# Matches the exact files either package.json or .travis.yml
[{.travis.yml}]
indent_style = space
indent_size = 2

View File

@ -10,8 +10,8 @@ dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0 dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
dep_lager = git https://github.com/erlang-lager/lager 3.6.4 dep_lager = git https://github.com/erlang-lager/lager 3.6.4
dep_esockd = git https://github.com/emqx/esockd emqx30 dep_esockd = git https://github.com/emqx/esockd v5.4
dep_ekka = git https://github.com/emqx/ekka emqx30 dep_ekka = git https://github.com/emqx/ekka v0.4.1
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0
dep_clique = git https://github.com/emqx/clique dep_clique = git https://github.com/emqx/clique
dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1 dep_lager_syslog = git https://github.com/basho/lager_syslog 3.0.1
@ -35,10 +35,11 @@ EUNIT_OPTS = verbose
# CT_SUITES = emqx_mqueue # CT_SUITES = emqx_mqueue
## emqx_trie emqx_router emqx_frame emqx_mqtt_compat ## emqx_trie emqx_router emqx_frame emqx_mqtt_compat
CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_base62 emqx_broker emqx_client emqx_cm emqx_frame emqx_guid emqx_inflight \ CT_SUITES = emqx emqx_banned emqx_connection emqx_session emqx_access emqx_broker emqx_cm emqx_frame emqx_guid emqx_inflight \
emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \ emqx_json emqx_keepalive emqx_lib emqx_metrics emqx_misc emqx_mod emqx_mqtt_caps \
emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_message emqx_net emqx_pqueue emqx_router emqx_sm \ emqx_mqtt_compat emqx_mqtt_properties emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone emqx_mountpoint emqx_stats emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_zone \
emqx_mountpoint emqx_listeners emqx_protocol
CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1 CT_OPTS = -cover test/ct.cover.spec -erl_args -name emqxct@127.0.0.1

View File

@ -1,9 +1,9 @@
# *EMQ X* - MQTT Broker # *EMQ X* - MQTT Broker
*EMQ X* broker is fully a open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients. *EMQ X* broker is a fully open source, highly scalable, highly available distributed message broker for IoT, M2M and Mobile applications that can handle tens of millions of concurrent clients.
Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket, STOMP and SockJS. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster. Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol specifications and backward compatible with MQTT V3.1 and V3.1.1, as well as other communication protocols such as MQTT-SN, CoAP, LwM2M, WebSocket and STOMP. The 3.0 release of the *EMQ X* broker can scaled to 10+ million concurrent MQTT connections on one cluster.
- For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/). - For full list of new features, please read *EMQ X* broker 3.0 [release notes](https://github.com/emqtt/emqttd/releases/).
@ -13,37 +13,37 @@ Starting from 3.0 release, *EMQ X* broker fully supports MQTT V5.0 protocol spec
## Installation ## Installation
The *EMQ* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi. The *EMQ X* broker is cross-platform, which can be deployed on Linux, Unix, Mac, Windows and even Raspberry Pi.
Download the binary package for your platform from [here](http://emqtt.io/downloads). Download the binary package for your platform from [here](http://emqtt.io/downloads).
-[Single Node Install](http://emqtt.io/docs/v2/install.html) - [Single Node Install](http://emqtt.io/docs/v2/install.html)
-[Multi Node Install](http://emqtt.io/docs/v2/cluster.html) - [Multi Node Install](http://emqtt.io/docs/v2/cluster.html)
## Build From Source ## Build From Source
The *EMQ* broker requires Erlang/OTP R21+ to build since 3.0 release. The *EMQ X* broker requires Erlang/OTP R21+ to build since 3.0 release.
``` ```
git clone https://github.com/emqtt/emq-relx.git git clone https://github.com/emqx/emqx-rel.git
cd emq-relx && make cd emqx-rel && make
cd _rel/emqttd && ./bin/emqttd console cd _rel/emqx && ./bin/emqx console
``` ```
## Quick Start ## Quick Start
# Start emqttd # Start emqx
./bin/emqttd start ./bin/emqx start
# Check Status # Check Status
./bin/emqttd_ctl status ./bin/emqx_ctl status
# Stop emqttd # Stop emqx
./bin/emqttd stop ./bin/emqx stop
To view the dashboard after running, use your browser to open: http://localhost:18083 To view the dashboard after running, use your browser to open: http://localhost:18083
@ -67,7 +67,8 @@ Please submit any bugs, issues, and feature requests to [emqtt/emqttd](//github.
## License ## License
Copyright (c) 2014-2018 [EMQ X Tech, LLC](http://emqtt.io)
Copyright (c) 2018 [EMQ Technologies Co., Ltd](http://emqtt.io). All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at Licensed under the Apache License, Version 2.0 (the "License");you may not use this file except in compliance with the License.You may obtain a copy of the License at
@ -76,6 +77,3 @@ Licensed under the Apache License, Version 2.0 (the "License");you may not use t
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and limitations under the License. See the License for the specific language governing permissions and limitations under the License.

View File

@ -529,6 +529,11 @@ zone.external.idle_timeout = 15s
## Default: 10 messages per second, and 100 messages burst. ## Default: 10 messages per second, and 100 messages burst.
## zone.external.publish_limit = 10,100 ## zone.external.publish_limit = 10,100
## Enable ban check.
##
## Value: Flag
zone.external.enable_ban = on
## Enable ACL check. ## Enable ACL check.
## ##
## Value: Flag ## Value: Flag
@ -580,6 +585,11 @@ zone.external.enable_stats = on
## Value: boolean ## Value: boolean
## zone.external.shared_subscription = false ## zone.external.shared_subscription = false
## Server Keep Alive
##
## Value: Number
## zone.external.server_keepalive = 0
## The backoff for MQTT keepalive timeout. The broker will kick a connection out ## The backoff for MQTT keepalive timeout. The broker will kick a connection out
## until 'Keepalive * backoff * 2' timeout. ## until 'Keepalive * backoff * 2' timeout.
## ##
@ -614,7 +624,7 @@ zone.external.max_awaiting_rel = 100
## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout. ## The QoS2 messages (Client -> Broker) will be dropped if awaiting PUBREL timeout.
## ##
## Value: Duration ## Value: Duration
zone.external.await_rel_timeout = 60s zone.external.await_rel_timeout = 300s
## Default session expiry interval for MQTT V3.1.1 connections. ## Default session expiry interval for MQTT V3.1.1 connections.
## ##

View File

@ -676,6 +676,12 @@ end}.
{datatype, {enum, [allow, deny]}} {datatype, {enum, [allow, deny]}}
]}. ]}.
%% @doc Enable Ban.
{mapping, "zone.$name.enable_ban", "emqx.zones", [
{default, off},
{datatype, flag}
]}.
%% @doc Enable ACL check. %% @doc Enable ACL check.
{mapping, "zone.$name.enable_acl", "emqx.zones", [ {mapping, "zone.$name.enable_acl", "emqx.zones", [
{default, off}, {default, off},
@ -735,6 +741,11 @@ end}.
{datatype, {enum, [true, false]}} {datatype, {enum, [true, false]}}
]}. ]}.
%% @doc Server Keepalive
{mapping, "zone.$name.server_keepalive", "emqx.zones", [
{datatype, integer}
]}.
%% @doc Keepalive backoff %% @doc Keepalive backoff
{mapping, "zone.$name.keepalive_backoff", "emqx.zones", [ {mapping, "zone.$name.keepalive_backoff", "emqx.zones", [
{default, 0.75}, {default, 0.75},
@ -774,7 +785,7 @@ end}.
%% @doc Awaiting PUBREL timeout %% @doc Awaiting PUBREL timeout
{mapping, "zone.$name.await_rel_timeout", "emqx.zones", [ {mapping, "zone.$name.await_rel_timeout", "emqx.zones", [
{default, "60s"}, {default, "300s"},
{datatype, {duration, ms}} {datatype, {duration, ms}}
]}. ]}.

View File

@ -66,41 +66,36 @@ is_running(Node) ->
%% PubSub API %% PubSub API
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). -spec(subscribe(emqx_topic:topic() | string()) -> ok).
subscribe(Topic) -> subscribe(Topic) ->
emqx_broker:subscribe(iolist_to_binary(Topic)). emqx_broker:subscribe(iolist_to_binary(Topic)).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) -spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
-> ok | {error, term()}). subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
subscribe(Topic, Sub) when is_list(Sub)-> emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub)); subscribe(Topic, SubPid) when is_pid(SubPid) ->
subscribe(Topic, Subscriber) when is_tuple(Subscriber) -> emqx_broker:subscribe(iolist_to_binary(Topic), SubPid).
{SubPid, SubId} = Subscriber,
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId).
-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(), -spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(),
emqx_topic:subopts()) -> ok | {error, term()}). emqx_types:subopts()) -> ok).
subscribe(Topic, Sub, Options) when is_list(Sub)-> subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)->
emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options); emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options);
subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)-> subscribe(Topic, SubPid, Options) when is_pid(SubPid)->
{SubPid, SubId} = Subscriber, emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options).
emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
-spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}). -spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
publish(Msg) -> publish(Msg) ->
emqx_broker:publish(Msg). emqx_broker:publish(Msg).
-spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}). -spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
unsubscribe(Topic) -> unsubscribe(Topic) ->
emqx_broker:unsubscribe(iolist_to_binary(Topic)). emqx_broker:unsubscribe(iolist_to_binary(Topic)).
-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string()) -spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
-> ok | {error, term()}). unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
unsubscribe(Topic, Sub) when is_list(Sub) -> emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId);
emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub)); unsubscribe(Topic, SubPid) when is_pid(SubPid) ->
unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) -> emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid).
{SubPid, SubId} = Subscriber,
emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% PubSub management API %% PubSub management API
@ -109,12 +104,12 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
-spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber()) -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
-> emqx_types:subopts()). -> emqx_types:subopts()).
get_subopts(Topic, Subscriber) -> get_subopts(Topic, Subscriber) ->
emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)). emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber).
-spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(), -spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(),
emqx_types:subopts()) -> ok). emqx_types:subopts()) -> boolean()).
set_subopts(Topic, Subscriber, Options) when is_list(Options) -> set_subopts(Topic, Subscriber, Options) when is_map(Options) ->
emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options). emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options).
-spec(topics() -> list(emqx_topic:topic())). -spec(topics() -> list(emqx_topic:topic())).
topics() -> emqx_router:topics(). topics() -> emqx_router:topics().
@ -127,16 +122,11 @@ subscribers(Topic) ->
subscriptions(Subscriber) -> subscriptions(Subscriber) ->
emqx_broker:subscriptions(Subscriber). emqx_broker:subscriptions(Subscriber).
-spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()). -spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()).
subscribed(Topic, Subscriber) -> subscribed(Topic, SubPid) when is_pid(SubPid) ->
emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)). emqx_broker:subscribed(iolist_to_binary(Topic), SubPid);
subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
list_to_subid(SubId) when is_binary(SubId) -> emqx_broker:subscribed(iolist_to_binary(Topic), SubId).
SubId;
list_to_subid(SubId) when is_list(SubId) ->
iolist_to_binary(SubId);
list_to_subid(SubPid) when is_pid(SubPid) ->
SubPid.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Hooks API %% Hooks API

View File

@ -153,9 +153,8 @@ init([]) ->
handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) -> handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
Mods = lookup_mods(Type), Mods = lookup_mods(Type),
reply(case lists:keyfind(Mod, 1, Mods) of reply(case lists:keymember(Mod, 1, Mods) of
true -> true -> {error, already_existed};
{error, already_existed};
false -> false ->
case catch Mod:init(Opts) of case catch Mod:init(Opts) of
{ok, ModState} -> {ok, ModState} ->

View File

@ -25,6 +25,8 @@
-define(ACL_RULE_TAB, emqx_acl_rule). -define(ACL_RULE_TAB, emqx_acl_rule).
-type(state() :: #{acl_file := string()}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -95,7 +97,7 @@ match(Credentials, Topic, [Rule|Rules]) ->
{matched, AllowDeny} {matched, AllowDeny}
end. end.
-spec(reload_acl(#{}) -> ok | {error, term()}). -spec(reload_acl(state()) -> ok | {error, term()}).
reload_acl(#{acl_file := AclFile}) -> reload_acl(#{acl_file := AclFile}) ->
case catch load_rules_from_file(AclFile) of case catch load_rules_from_file(AclFile) of
true -> true ->

View File

@ -28,10 +28,11 @@
-define(ALARM_MGR, ?MODULE). -define(ALARM_MGR, ?MODULE).
-record(state, {alarms}).
start_link() -> start_link() ->
start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end). start_with(
fun(Pid) ->
gen_event:add_handler(Pid, ?MODULE, [])
end).
start_with(Fun) -> start_with(Fun) ->
case gen_event:start_link({local, ?ALARM_MGR}) of case gen_event:start_link({local, ?ALARM_MGR}) of
@ -73,42 +74,42 @@ delete_alarm_handler(Module) when is_atom(Module) ->
%% Default Alarm handler %% Default Alarm handler
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
init(_) -> {ok, #state{alarms = []}}. init(_) -> {ok, #{alarms => []}}.
handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)-> handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State); handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State);
handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) -> handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) ->
case encode_alarm(Alarm) of case encode_alarm(Alarm) of
{ok, Json} -> {ok, Json} ->
emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json)); emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
{error, Reason} -> {error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason]) emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
end, end,
{ok, State#state{alarms = [Alarm|Alarms]}}; {ok, State#{alarms := [Alarm|Alarms]}};
handle_event({clear_alarm, AlarmId}, State = #state{alarms = Alarms}) -> handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
case emqx_json:safe_encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of
{ok, Json} -> {ok, Json} ->
emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json)); emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
{error, Reason} -> {error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason]) emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason])
end, end,
{ok, State#state{alarms = lists:keydelete(AlarmId, 2, Alarms)}, hibernate}; {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
handle_event(Event, State)-> handle_event(Event, State)->
error_logger:error("[AlarmMgr] unexpected event: ~p", [Event]), emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
{ok, State}. {ok, State}.
handle_info(Info, State) -> handle_info(Info, State) ->
error_logger:error("[AlarmMgr] unexpected info: ~p", [Info]), emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
{ok, State}. {ok, State}.
handle_call(get_alarms, State = #state{alarms = Alarms}) -> handle_call(get_alarms, State = #{alarms := Alarms}) ->
{ok, Alarms, State}; {ok, Alarms, State};
handle_call(Req, State) -> handle_call(Req, State) ->
error_logger:error("[AlarmMgr] unexpected call: ~p", [Req]), emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
{ok, ignored, State}. {ok, ignored, State}.
terminate(swap, State) -> terminate(swap, State) ->
@ -132,8 +133,8 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title,
alarm_msg(Type, AlarmId, Json) -> alarm_msg(Type, AlarmId, Json) ->
Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json), Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json),
emqx_message:set_headers(#{'Content-Type' => <<"application/json">>}, emqx_message:set_headers( #{'Content-Type' => <<"application/json">>},
emqx_message:set_flags(#{sys => true}, Msg)). emqx_message:set_flag(sys, Msg)).
topic(alert, AlarmId) -> topic(alert, AlarmId) ->
emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>); emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);

View File

@ -24,27 +24,23 @@
-boot_mnesia({mnesia, [boot]}). -boot_mnesia({mnesia, [boot]}).
-copy_mnesia({mnesia, [copy]}). -copy_mnesia({mnesia, [copy]}).
%% API
-export([start_link/0]). -export([start_link/0]).
-export([check/1]). -export([check/1]).
-export([add/1, del/1]). -export([add/1, del/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, {expiry_timer}).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Mnesia bootstrap %% Mnesia bootstrap
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
mnesia(boot) -> mnesia(boot) ->
ok = ekka_mnesia:create_table(?TAB, [ ok = ekka_mnesia:create_table(?TAB, [
{type, ordered_set}, {type, set},
{disc_copies, [node()]}, {disc_copies, [node()]},
{record_name, banned}, {record_name, banned},
{attributes, record_info(fields, banned)}]); {attributes, record_info(fields, banned)}]);
@ -52,11 +48,7 @@ mnesia(boot) ->
mnesia(copy) -> mnesia(copy) ->
ok = ekka_mnesia:copy_table(?TAB). ok = ekka_mnesia:copy_table(?TAB).
%%-------------------------------------------------------------------- %% @doc Start the banned server.
%% API
%%--------------------------------------------------------------------
%% @doc Start the banned server
-spec(start_link() -> emqx_types:startlink_ret()). -spec(start_link() -> emqx_types:startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@ -67,9 +59,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
orelse ets:member(?TAB, {username, Username}) orelse ets:member(?TAB, {username, Username})
orelse ets:member(?TAB, {ipaddr, IPAddr}). orelse ets:member(?TAB, {ipaddr, IPAddr}).
add(Record) when is_record(Record, banned) -> -spec(add(#banned{}) -> ok).
mnesia:dirty_write(?TAB, Record). add(Banned) when is_record(Banned, banned) ->
mnesia:dirty_write(?TAB, Banned).
-spec(del({client_id, emqx_types:client_id()} |
{username, emqx_types:username()} |
{peername, emqx_types:peername()}) -> ok).
del(Key) -> del(Key) ->
mnesia:dirty_delete(?TAB, Key). mnesia:dirty_delete(?TAB, Key).
@ -78,27 +74,26 @@ del(Key) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
init([]) -> init([]) ->
emqx_time:seed(), {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
{ok, ensure_expiry_timer(#state{})}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[BANNED] Unexpected request: ~p", [Req]), emqx_logger:error("[BANNED] unexpected call: ~p", [Req]),
{reply, ignore, State}. {reply, ignored, State}.
handle_cast(Msg, State) -> handle_cast(Msg, State) ->
emqx_logger:error("[BANNED] Unexpected msg: ~p", [Msg]), emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]),
{noreply, State}. {noreply, State}.
handle_info({timeout, Ref, expire}, State = #state{expiry_timer = Ref}) -> handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]), mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]),
{noreply, ensure_expiry_timer(State), hibernate}; {noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) -> handle_info(Info, State) ->
emqx_logger:error("[BANNED] Unexpected info: ~p", [Info]), emqx_logger:error("[BANNED] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{expiry_timer = Timer}) -> terminate(_Reason, #{expiry_timer := TRef}) ->
emqx_misc:cancel_timer(Timer). emqx_misc:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->
{ok, State}. {ok, State}.
@ -108,9 +103,7 @@ code_change(_OldVsn, State, _Extra) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
ensure_expiry_timer(State) -> ensure_expiry_timer(State) ->
Interval = emqx_config:get_env(banned_expiry_interval, timer:minutes(5)), State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
State#state{expiry_timer = emqx_misc:start_timer(
Interval + rand:uniform(Interval), expire)}.
expire_banned_items(Now) -> expire_banned_items(Now) ->
expire_banned_item(mnesia:first(?TAB), Now). expire_banned_item(mnesia:first(?TAB), Now).
@ -119,11 +112,11 @@ expire_banned_item('$end_of_table', _Now) ->
ok; ok;
expire_banned_item(Key, Now) -> expire_banned_item(Key, Now) ->
case mnesia:read(?TAB, Key) of case mnesia:read(?TAB, Key) of
[#banned{until = undefined}] -> ok; [#banned{until = undefined}] ->
ok;
[B = #banned{until = Until}] when Until < Now -> [B = #banned{until = Until}] when Until < Now ->
mnesia:delete_object(?TAB, B, sticky_write); mnesia:delete_object(?TAB, B, sticky_write);
[_] -> ok; _ -> ok
[] -> ok
end, end,
expire_banned_item(mnesia:next(?TAB, Key), Now). expire_banned_item(mnesia:next(?TAB, Key), Now).

View File

@ -22,7 +22,7 @@
%% @doc Encode any data to base62 binary %% @doc Encode any data to base62 binary
-spec encode(string() -spec encode(string()
| integer() | integer()
| binary()) -> float(). | binary()) -> binary().
encode(I) when is_integer(I) -> encode(I) when is_integer(I) ->
encode(integer_to_binary(I)); encode(integer_to_binary(I));
encode(S) when is_list(S)-> encode(S) when is_list(S)->
@ -110,4 +110,3 @@ decode_char(I) when I >= $A andalso I =< $Z->
decode_char(9, I) -> decode_char(9, I) ->
I + 61 - $A. I + 61 - $A.

View File

@ -117,7 +117,7 @@ handle_info(start, State = #state{options = Options,
{noreply, State#state{client_pid = ClientPid}}; {noreply, State#state{client_pid = ClientPid}};
{error,_} -> {error,_} ->
erlang:send_after(ReconnectTime, self(), start), erlang:send_after(ReconnectTime, self(), start),
{noreply, State = #state{reconnect_count = ReconnectCount-1}} {noreply, State#state{reconnect_count = ReconnectCount-1}}
end; end;
%%---------------------------------------------------------------- %%----------------------------------------------------------------
@ -133,11 +133,12 @@ handle_info(start, State = #state{options = Options,
Subs = get_value(subscriptions, Options, []), Subs = get_value(subscriptions, Options, []),
[emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs], [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","), ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","),
[emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})], [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules,
emqx_topic:validate({filter, i2b(Topic)})],
{noreply, State#state{client_pid = ClientPid}}; {noreply, State#state{client_pid = ClientPid}};
{error,_} -> {error,_} ->
erlang:send_after(ReconnectTime, self(), start), erlang:send_after(ReconnectTime, self(), start),
{noreply, State = #state{reconnect_count = ReconnectCount-1}} {noreply, State#state{reconnect_count = ReconnectCount-1}}
end; end;
%%---------------------------------------------------------------- %%----------------------------------------------------------------

View File

@ -27,7 +27,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
%% @doc List all bridges %% @doc List all bridges
-spec(bridges() -> [{node(), emqx_topic:topic(), pid()}]). -spec(bridges() -> [{node(), Status :: binary()}]).
bridges() -> bridges() ->
[{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)]. [{Name, emqx_bridge:status(Pid)} || {Name, Pid, _, _} <- supervisor:which_children(?MODULE)].

View File

@ -260,9 +260,9 @@ subscription(Topic, Subscriber) ->
-spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()). -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) -> subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1; length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) -> subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) == 1; length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) -> subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}). ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).

View File

@ -186,7 +186,7 @@ with_owner(Options) ->
connect(Client) -> connect(Client) ->
gen_statem:call(Client, connect, infinity). gen_statem:call(Client, connect, infinity).
-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]}) -spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}])
-> subscribe_ret()). -> subscribe_ret()).
subscribe(Client, Topic) when is_binary(Topic) -> subscribe(Client, Topic) when is_binary(Topic) ->
subscribe(Client, {Topic, ?QOS_0}); subscribe(Client, {Topic, ?QOS_0});
@ -373,22 +373,12 @@ init([Options]) ->
{_ver, undefined} -> random_client_id(); {_ver, undefined} -> random_client_id();
{_ver, Id} -> iolist_to_binary(Id) {_ver, Id} -> iolist_to_binary(Id)
end, end,
Username = case proplists:get_value(username, Options) of
undefined -> <<>>;
Name -> Name
end,
Password = case proplists:get_value(password, Options) of
undefined -> <<>>;
Passw -> Passw
end,
State = init(Options, #state{host = {127,0,0,1}, State = init(Options, #state{host = {127,0,0,1},
port = 1883, port = 1883,
hosts = [], hosts = [],
sock_opts = [], sock_opts = [],
bridge_mode = false, bridge_mode = false,
client_id = ClientId, client_id = ClientId,
username = Username,
password = Password,
clean_start = true, clean_start = true,
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
@ -450,9 +440,9 @@ init([{client_id, ClientId} | Opts], State) ->
init(Opts, State#state{client_id = iolist_to_binary(ClientId)}); init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
init(Opts, State#state{clean_start = CleanStart}); init(Opts, State#state{clean_start = CleanStart});
init([{useranme, Username} | Opts], State) -> init([{username, Username} | Opts], State) ->
init(Opts, State#state{username = iolist_to_binary(Username)}); init(Opts, State#state{username = iolist_to_binary(Username)});
init([{passwrod, Password} | Opts], State) -> init([{password, Password} | Opts], State) ->
init(Opts, State#state{password = iolist_to_binary(Password)}); init(Opts, State#state{password = iolist_to_binary(Password)});
init([{keepalive, Secs} | Opts], State) -> init([{keepalive, Secs} | Opts], State) ->
init(Opts, State#state{keepalive = timer:seconds(Secs)}); init(Opts, State#state{keepalive = timer:seconds(Secs)});
@ -552,8 +542,6 @@ mqtt_connect(State = #state{client_id = ClientId,
properties = Properties}) -> properties = Properties}) ->
?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg, ?WILL_MSG(WillQoS, WillRetain, WillTopic, WillProps, WillPayload) = WillMsg,
ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties), ConnProps = emqx_mqtt_properties:filter(?CONNECT, Properties),
io:format("ConnProps: ~p, ClientID: ~p, Username: ~p, Password: ~p~n",
[ConnProps, ClientId, Username, Password]),
send(?CONNECT_PACKET( send(?CONNECT_PACKET(
#mqtt_packet_connect{proto_ver = ProtoVer, #mqtt_packet_connect{proto_ver = ProtoVer,
proto_name = ProtoName, proto_name = ProtoName,
@ -592,8 +580,8 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode, waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
_SessPresent, _SessPresent,
Properties), State) -> Properties), State = #state{ proto_ver = ProtoVer}) ->
Reason = emqx_reason_codes:name(ReasonCode), Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
case take_call(connect, State) of case take_call(connect, State) of
{value, #call{from = From}, _State} -> {value, #call{from = From}, _State} ->
Reply = {error, {Reason, Properties}}, Reply = {error, {Reason, Properties}},
@ -1082,6 +1070,7 @@ receive_loop(Bytes, State = #state{parse_state = ParseState}) ->
{error, Reason} -> {error, Reason} ->
{stop, Reason}; {stop, Reason};
{'EXIT', Error} -> {'EXIT', Error} ->
io:format("client stop"),
{stop, Error} {stop, Error}
end. end.

View File

@ -25,11 +25,17 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []). supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> init([]) ->
{ok, {{one_for_all, 10, 3600}, Banned = #{id => banned,
[#{id => manager, start => {emqx_banned, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [emqx_banned]},
Manager = #{id => manager,
start => {emqx_cm, start_link, []}, start => {emqx_cm, start_link, []},
restart => permanent, restart => permanent,
shutdown => 5000, shutdown => 5000,
type => worker, type => worker,
modules => [emqx_cm]}]}}. modules => [emqx_cm]},
{ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.

View File

@ -202,20 +202,23 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} -> {ok, ProtoState1} ->
{noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))}; {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
{error, Reason} -> {error, Reason} ->
shutdown(Reason, State); shutdown(Reason, State)
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1})
end; end;
handle_info(emit_stats, State = #state{proto_state = ProtoState}) -> handle_info({timeout, Timer, emit_stats},
State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate}; {noreply, State#state{stats_timer = undefined}, hibernate};
handle_info(timeout, State) -> handle_info(timeout, State) ->
shutdown(idle_timeout, State); shutdown(idle_timeout, State);
handle_info({shutdown, Error}, State) -> handle_info({shutdown, Reason}, State) ->
shutdown(Error, State); shutdown(Reason, State);
handle_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
shutdown(discard, State);
handle_info({shutdown, conflict, {ClientId, NewPid}}, State) -> handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
@ -240,10 +243,10 @@ handle_info({inet_reply, _Sock, ok}, State) ->
handle_info({inet_reply, _Sock, {error, Reason}}, State) -> handle_info({inet_reply, _Sock, {error, Reason}}, State) ->
shutdown(Reason, State); shutdown(Reason, State);
handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Sock}) -> handle_info({keepalive, start, Interval}, State = #state{transport = Transport, socket = Socket}) ->
?LOG(debug, "Keepalive at the interval of ~p", [Interval], State), ?LOG(debug, "Keepalive at the interval of ~p", [Interval], State),
StatFun = fun() -> StatFun = fun() ->
case Transport:getstat(Sock, [recv_oct]) of case Transport:getstat(Socket, [recv_oct]) of
{ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
Error -> Error Error -> Error
end end
@ -270,11 +273,11 @@ handle_info(Info, State) ->
{noreply, State}. {noreply, State}.
terminate(Reason, State = #state{transport = Transport, terminate(Reason, State = #state{transport = Transport,
socket = Sock, socket = Socket,
keepalive = KeepAlive, keepalive = KeepAlive,
proto_state = ProtoState}) -> proto_state = ProtoState}) ->
?LOG(debug, "Terminated for ~p", [Reason], State), ?LOG(debug, "Terminated for ~p", [Reason], State),
Transport:fast_close(Sock), Transport:fast_close(Socket),
emqx_keepalive:cancel(KeepAlive), emqx_keepalive:cancel(KeepAlive),
case {ProtoState, Reason} of case {ProtoState, Reason} of
{undefined, _} -> ok; {undefined, _} -> ok;
@ -307,13 +310,13 @@ handle_packet(Data, State = #state{proto_state = ProtoState,
{ok, ProtoState1} -> {ok, ProtoState1} ->
NewState = State#state{proto_state = ProtoState1}, NewState = State#state{proto_state = ProtoState1},
handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState))); handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
{error, Error} -> {error, Reason} ->
?LOG(error, "Protocol error - ~p", [Error], State), ?LOG(error, "Process packet error - ~p", [Reason], State),
shutdown(Error, State); shutdown(Reason, State);
{error, Error, ProtoState1} -> {error, Reason, ProtoState1} ->
shutdown(Error, State#state{proto_state = ProtoState1}); shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} -> {stop, Error, ProtoState1} ->
stop(Reason, State#state{proto_state = ProtoState1}) stop(Error, State#state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Error} ->
?LOG(error, "Framing error - ~p", [Error], State), ?LOG(error, "Framing error - ~p", [Error], State),
@ -358,8 +361,8 @@ run_socket(State = #state{conn_state = blocked}) ->
State; State;
run_socket(State = #state{await_recv = true}) -> run_socket(State = #state{await_recv = true}) ->
State; State;
run_socket(State = #state{transport = Transport, socket = Sock}) -> run_socket(State = #state{transport = Transport, socket = Socket}) ->
Transport:async_recv(Sock, 0, infinity), Transport:async_recv(Socket, 0, infinity),
State#state{await_recv = true}. State#state{await_recv = true}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -369,7 +372,7 @@ run_socket(State = #state{transport = Transport, socket = Sock}) ->
ensure_stats_timer(State = #state{enable_stats = true, ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined, stats_timer = undefined,
idle_timeout = IdleTimeout}) -> idle_timeout = IdleTimeout}) ->
State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)}; State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
ensure_stats_timer(State) -> State. ensure_stats_timer(State) -> State.
shutdown(Reason, State) -> shutdown(Reason, State) ->

View File

@ -60,11 +60,12 @@ init([Pool, Id, Node, Topic, Options]) ->
case net_kernel:connect_node(Node) of case net_kernel:connect_node(Node) of
true -> true ->
true = erlang:monitor_node(Node, true), true = erlang:monitor_node(Node, true),
Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]), emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
%%TODO: queue.... MQueue = emqx_mqueue:init(#{type => simple,
MQueue = emqx_mqueue:new(qname(Node, Topic), [{max_len, State#state.max_queue_len}]), max_len => State#state.max_queue_len,
store_qos0 => true}),
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
false -> false ->
{stop, {cannot_connect_node, Node}} {stop, {cannot_connect_node, Node}}
@ -85,11 +86,6 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) ->
parse_opts([_Opt | Opts], State) -> parse_opts([_Opt | Opts], State) ->
parse_opts(Opts, State). parse_opts(Opts, State).
qname(Node, Topic) when is_atom(Node) ->
qname(atom_to_list(Node), Topic);
qname(Node, Topic) ->
iolist_to_binary(["Bridge:", Node, ":", Topic]).
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Bridge] unexpected call: ~p", [Req]), emqx_logger:error("[Bridge] unexpected call: ~p", [Req]),
{reply, ignored, State}. {reply, ignored, State}.
@ -103,7 +99,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down})
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
{noreply, State}; {noreply, State};
handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) -> handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
@ -156,7 +152,6 @@ dequeue(State = #state{mqueue = MQ}) ->
dequeue(State#state{mqueue = MQ1}) dequeue(State#state{mqueue = MQ1})
end. end.
transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) ->
topic_suffix = Suffix}) ->
Msg#message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}. Msg#message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.

View File

@ -26,8 +26,6 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
-record(state, {}).
%% Bytes sent and received of Broker %% Bytes sent and received of Broker
-define(BYTES_METRICS, [ -define(BYTES_METRICS, [
{counter, 'bytes/received'}, % Total bytes received {counter, 'bytes/received'}, % Total bytes received
@ -73,6 +71,7 @@
{counter, 'messages/qos1/received'}, % QoS1 Messages received {counter, 'messages/qos1/received'}, % QoS1 Messages received
{counter, 'messages/qos1/sent'}, % QoS1 Messages sent {counter, 'messages/qos1/sent'}, % QoS1 Messages sent
{counter, 'messages/qos2/received'}, % QoS2 Messages received {counter, 'messages/qos2/received'}, % QoS2 Messages received
{counter, 'messages/qos2/expired'}, % QoS2 Messages expired
{counter, 'messages/qos2/sent'}, % QoS2 Messages sent {counter, 'messages/qos2/sent'}, % QoS2 Messages sent
{counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped {counter, 'messages/qos2/dropped'}, % QoS2 Messages dropped
{gauge, 'messages/retained'}, % Messagea retained {gauge, 'messages/retained'}, % Messagea retained
@ -84,8 +83,8 @@
-define(TAB, ?MODULE). -define(TAB, ?MODULE).
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
%% @doc Start the metrics server %% @doc Start the metrics server.
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). -spec(start_link() -> emqx_types:startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@ -251,7 +250,7 @@ init([]) ->
% Create metrics table % Create metrics table
_ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]), _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS), lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
{ok, #state{}, hibernate}. {ok, #{}, hibernate}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Metrics] unexpected call: ~p", [Req]), emqx_logger:error("[Metrics] unexpected call: ~p", [Req]),
@ -265,7 +264,7 @@ handle_info(Info, State) ->
emqx_logger:error("[Metrics] unexpected info: ~p", [Info]), emqx_logger:error("[Metrics] unexpected info: ~p", [Info]),
{noreply, State}. {noreply, State}.
terminate(_Reason, #state{}) -> terminate(_Reason, #{}) ->
ok. ok.
code_change(_OldVsn, State, _Extra) -> code_change(_OldVsn, State, _Extra) ->

View File

@ -23,8 +23,8 @@
load(Rules0) -> load(Rules0) ->
Rules = compile(Rules0), Rules = compile(Rules0),
emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/4, [Rules]), emqx_hooks:add('client.subscribe', fun ?MODULE:rewrite_subscribe/3, [Rules]),
emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]), emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]). emqx_hooks:add('message.publish', fun ?MODULE:rewrite_publish/2, [Rules]).
rewrite_subscribe(_Credentials, TopicTable, Rules) -> rewrite_subscribe(_Credentials, TopicTable, Rules) ->

View File

@ -55,10 +55,11 @@ validate(?UNSUBSCRIBE_PACKET(PacketId, TopicFilters)) ->
validate_packet_id(PacketId) validate_packet_id(PacketId)
andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters); andalso ok == lists:foreach(fun emqx_topic:validate/1, TopicFilters);
validate(?PUBLISH_PACKET(_QoS, <<>>, _, _)) -> validate(?PUBLISH_PACKET(_QoS, <<>>, _, _, _)) ->
error(topic_name_invalid); error(topic_name_invalid);
validate(?PUBLISH_PACKET(_QoS, Topic, _, _)) -> validate(?PUBLISH_PACKET(_QoS, Topic, _, Properties, _)) ->
(not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid); ((not emqx_topic:wildcard(Topic)) orelse error(topic_name_invalid))
andalso validate_properties(?PUBLISH, Properties);
validate(_Packet) -> validate(_Packet) ->
true. true.
@ -71,9 +72,14 @@ validate_packet_id(_) ->
validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I}) validate_properties(?SUBSCRIBE, #{'Subscription-Identifier' := I})
when I =< 0; I >= 16#FFFFFFF -> when I =< 0; I >= 16#FFFFFFF ->
error(subscription_identifier_invalid); error(subscription_identifier_invalid);
validate_properties(?PUBLISH, # {'Topic-Alias':= I})
when I =:= 0 ->
error(topic_alias_invalid);
validate_properties(_, _) -> validate_properties(_, _) ->
true. true.
validate_subscription({Topic, #{qos := QoS}}) -> validate_subscription({Topic, #{qos := QoS}}) ->
emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). emqx_topic:validate(filter, Topic) andalso validate_qos(QoS).
@ -189,6 +195,10 @@ format_variable(#mqtt_packet_connect{
end, end,
io_lib:format(Format1, Args1); io_lib:format(Format1, Args1);
format_variable(#mqtt_packet_disconnect
{reason_code = ReasonCode}) ->
io_lib:format("ReasonCode=~p", [ReasonCode]);
format_variable(#mqtt_packet_connack{ack_flags = AckFlags, format_variable(#mqtt_packet_connack{ack_flags = AckFlags,
reason_code = ReasonCode}) -> reason_code = ReasonCode}) ->
io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]); io_lib:format("AckFlags=~p, ReasonCode=~p", [AckFlags, ReasonCode]);

View File

@ -41,6 +41,7 @@
proto_name, proto_name,
ackprops, ackprops,
client_id, client_id,
is_assigned,
conn_pid, conn_pid,
conn_props, conn_props,
ack_props, ack_props,
@ -55,6 +56,7 @@
mountpoint, mountpoint,
is_super, is_super,
is_bridge, is_bridge,
enable_ban,
enable_acl, enable_acl,
recv_stats, recv_stats,
send_stats, send_stats,
@ -87,6 +89,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
proto_ver = ?MQTT_PROTO_V4, proto_ver = ?MQTT_PROTO_V4,
proto_name = <<"MQTT">>, proto_name = <<"MQTT">>,
client_id = <<>>, client_id = <<>>,
is_assigned = false,
conn_pid = self(), conn_pid = self(),
username = init_username(Peercert, Options), username = init_username(Peercert, Options),
is_super = false, is_super = false,
@ -95,10 +98,11 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
packet_size = emqx_zone:get_env(Zone, max_packet_size), packet_size = emqx_zone:get_env(Zone, max_packet_size),
mountpoint = emqx_zone:get_env(Zone, mountpoint), mountpoint = emqx_zone:get_env(Zone, mountpoint),
is_bridge = false, is_bridge = false,
enable_ban = emqx_zone:get_env(Zone, enable_ban, false),
enable_acl = emqx_zone:get_env(Zone, enable_acl), enable_acl = emqx_zone:get_env(Zone, enable_acl),
recv_stats = #{msg => 0, pkt => 0}, recv_stats = #{msg => 0, pkt => 0},
send_stats = #{msg => 0, pkt => 0}, send_stats = #{msg => 0, pkt => 0},
connected = fasle}. connected = false}.
init_username(Peercert, Options) -> init_username(Peercert, Options) ->
case proplists:get_value(peer_cert_as_username, Options) of case proplists:get_value(peer_cert_as_username, Options) of
@ -117,13 +121,13 @@ set_username(_Username, PState) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
info(PState = #pstate{conn_props = ConnProps, info(PState = #pstate{conn_props = ConnProps,
ack_props = AclProps, ack_props = AckProps,
session = Session, session = Session,
topic_aliases = Aliases, topic_aliases = Aliases,
will_msg = WillMsg, will_msg = WillMsg,
enable_acl = EnableAcl}) -> enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps}, attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AclProps}, {ack_props, AckProps},
{session, Session}, {session, Session},
{topic_aliases, Aliases}, {topic_aliases, Aliases},
{will_msg, WillMsg}, {will_msg, WillMsg},
@ -184,14 +188,14 @@ session(#pstate{session = SPid}) ->
SPid. SPid.
parser(#pstate{packet_size = Size, proto_ver = Ver}) -> parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
emqx_frame:initial_state(#{packet_size => Size, version => Ver}). emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Packet Received %% Packet Received
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(received(emqx_mqtt_types:packet(), state()) -spec(received(emqx_mqtt_types:packet(), state()) ->
-> {ok, state()} | {error, term()} | {error, term(), state()}). {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}).
received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT -> received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
{error, proto_not_connected, PState}; {error, proto_not_connected, PState};
@ -276,7 +280,6 @@ process_packet(?CONNECT_PACKET(
will_msg = WillMsg, will_msg = WillMsg,
is_bridge = IsBridge, is_bridge = IsBridge,
connected_at = os:timestamp()}), connected_at = os:timestamp()}),
connack( connack(
case check_connect(Connect, PState1) of case check_connect(Connect, PState1) of
{ok, PState2} -> {ok, PState2} ->
@ -402,17 +405,18 @@ process_packet(?PACKET(?DISCONNECT), PState) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
connack({?RC_SUCCESS, SP, PState}) -> connack({?RC_SUCCESS, SP, PState}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, attrs(PState)]),
deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState)); deliver({connack, ?RC_SUCCESS, sp(SP)}, update_mountpoint(PState));
connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) -> connack({ReasonCode, PState = #pstate{proto_ver = ProtoVer}}) ->
emqx_hooks:run('client.connected', [credentials(PState), ?RC_SUCCESS, info(PState)]), emqx_hooks:run('client.connected', [credentials(PState), ReasonCode, attrs(PState)]),
_ = deliver({connack, if ProtoVer =:= ?MQTT_PROTO_V5 -> ReasonCode1 = if ProtoVer =:= ?MQTT_PROTO_V5 ->
ReasonCode; ReasonCode;
true -> true ->
emqx_reason_codes:compat(connack, ReasonCode) emqx_reason_codes:compat(connack, ReasonCode)
end}, PState), end,
{error, emqx_reason_codes:name(ReasonCode), PState}. _ = deliver({connack, ReasonCode1}, PState),
{error, emqx_reason_codes:name(ReasonCode1, ProtoVer), PState}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Publish Message -> Broker %% Publish Message -> Broker
@ -447,9 +451,37 @@ puback(?QOS_2, PacketId, {ok, _}, PState) ->
%% Deliver Packet -> Client %% Deliver Packet -> Client
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(deliver(tuple(), state()) -> {ok, state()} | {error, term()}).
deliver({connack, ReasonCode}, PState) -> deliver({connack, ReasonCode}, PState) ->
send(?CONNACK_PACKET(ReasonCode), PState); send(?CONNACK_PACKET(ReasonCode), PState);
deliver({connack, ?RC_SUCCESS, SP}, PState = #pstate{zone = Zone,
proto_ver = ?MQTT_PROTO_V5,
client_id = ClientId,
is_assigned = IsAssigned}) ->
#{max_packet_size := MaxPktSize,
max_qos_allowed := MaxQoS,
mqtt_retain_available := Retain,
max_topic_alias := MaxAlias,
mqtt_shared_subscription := Shared,
mqtt_wildcard_subscription := Wildcard} = caps(PState),
Props = #{'Maximum-QoS' => MaxQoS,
'Retain-Available' => flag(Retain),
'Maximum-Packet-Size' => MaxPktSize,
'Topic-Alias-Maximum' => MaxAlias,
'Wildcard-Subscription-Available' => flag(Wildcard),
'Subscription-Identifier-Available' => 1,
'Shared-Subscription-Available' => flag(Shared)},
Props1 = if IsAssigned ->
Props#{'Assigned-Client-Identifier' => ClientId};
true -> Props
end,
Props2 = case emqx_zone:get_env(Zone, server_keepalive) of
undefined -> Props1;
Keepalive -> Props1#{'Server-Keep-Alive' => Keepalive}
end,
send(?CONNACK_PACKET(?RC_SUCCESS, SP, Props2), PState);
deliver({connack, ReasonCode, SP}, PState) -> deliver({connack, ReasonCode, SP}, PState) ->
send(?CONNACK_PACKET(ReasonCode, SP), PState); send(?CONNACK_PACKET(ReasonCode, SP), PState);
@ -509,7 +541,7 @@ send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun
maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) -> maybe_assign_client_id(PState = #pstate{client_id = <<>>, ackprops = AckProps}) ->
ClientId = emqx_guid:to_base62(emqx_guid:gen()), ClientId = emqx_guid:to_base62(emqx_guid:gen()),
AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps), AckProps1 = set_property('Assigned-Client-Identifier', ClientId, AckProps),
PState#pstate{client_id = ClientId, ackprops = AckProps1}; PState#pstate{client_id = ClientId, is_assigned = true, ackprops = AckProps1};
maybe_assign_client_id(PState) -> maybe_assign_client_id(PState) ->
PState. PState.
@ -533,8 +565,12 @@ try_open_session(#pstate{zone = Zone,
authenticate(Credentials, Password) -> authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false}; ok -> {ok, false};
{ok, IsSuper} -> {ok, IsSuper}; {ok, IsSuper} when is_boolean(IsSuper) ->
{error, Error} -> {error, Error} {ok, IsSuper};
{ok, Result} when is_map(Result) ->
{ok, maps:get(is_superuser, Result, false)};
{error, Error} ->
{error, Error}
end. end.
set_property(Name, Value, undefined) -> set_property(Name, Value, undefined) ->
@ -548,7 +584,8 @@ set_property(Name, Value, Props) ->
check_connect(Packet, PState) -> check_connect(Packet, PState) ->
run_check_steps([fun check_proto_ver/2, run_check_steps([fun check_proto_ver/2,
fun check_client_id/2], Packet, PState). fun check_client_id/2,
fun check_banned/2], Packet, PState).
check_proto_ver(#mqtt_packet_connect{proto_ver = Ver, check_proto_ver(#mqtt_packet_connect{proto_ver = Ver,
proto_name = Name}, _PState) -> proto_name = Name}, _PState) ->
@ -579,6 +616,17 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID} false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
end. end.
check_banned(_Connect, #pstate{enable_ban = false}) ->
ok;
check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
#pstate{peername = Peername}) ->
case emqx_banned:check(#{client_id => ClientId,
username => Username,
peername => Peername}) of
true -> {error, ?RC_BANNED};
false -> ok
end.
check_publish(Packet, PState) -> check_publish(Packet, PState) ->
run_check_steps([fun check_pub_caps/2, run_check_steps([fun check_pub_caps/2,
fun check_pub_acl/2], Packet, PState). fun check_pub_acl/2], Packet, PState).
@ -648,26 +696,27 @@ inc_stats(Type, Stats = #{pkt := PktCnt, msg := MsgCnt}) ->
false -> MsgCnt false -> MsgCnt
end}. end}.
shutdown(_Error, #pstate{client_id = undefined}) -> shutdown(_Reason, #pstate{client_id = undefined}) ->
ignore; ok;
shutdown(conflict, #pstate{client_id = ClientId}) -> shutdown(_Reason, #pstate{connected = false}) ->
emqx_cm:unregister_connection(ClientId), ok;
ignore; shutdown(Reason, #pstate{client_id = ClientId}) when Reason =:= conflict;
shutdown(mnesia_conflict, #pstate{client_id = ClientId}) -> Reason =:= discard ->
emqx_cm:unregister_connection(ClientId), emqx_cm:unregister_connection(ClientId);
ignore; shutdown(Reason, PState = #pstate{connected = true,
shutdown(Error, PState = #pstate{client_id = ClientId, will_msg = WillMsg}) -> client_id = ClientId,
?LOG(info, "Shutdown for ~p", [Error], PState), will_msg = WillMsg}) ->
%% TODO: Auth failure not publish the will message ?LOG(info, "Shutdown for ~p", [Reason], PState),
case Error =:= auth_failure of _ = send_willmsg(WillMsg),
true -> ok; emqx_hooks:run('client.disconnected', [credentials(PState), Reason]),
false -> send_willmsg(WillMsg)
end,
emqx_hooks:run('client.disconnected', [credentials(PState), Error]),
emqx_cm:unregister_connection(ClientId). emqx_cm:unregister_connection(ClientId).
send_willmsg(undefined) -> send_willmsg(undefined) ->
ignore; ignore;
send_willmsg(WillMsg = #message{topic = Topic,
headers = #{'Will-Delay-Interval' := Interval}}) when is_integer(Interval) ->
SendAfter = integer_to_binary(Interval),
emqx_broker:publish(WillMsg#message{topic = <<"$delayed/", SendAfter/binary, "/", Topic/binary>>});
send_willmsg(WillMsg) -> send_willmsg(WillMsg) ->
emqx_broker:publish(WillMsg). emqx_broker:publish(WillMsg).
@ -710,3 +759,5 @@ update_mountpoint(PState = #pstate{mountpoint = MountPoint}) ->
sp(true) -> 1; sp(true) -> 1;
sp(false) -> 0. sp(false) -> 0.
flag(false) -> 0;
flag(true) -> 1.

View File

@ -17,9 +17,19 @@
-include("emqx_mqtt.hrl"). -include("emqx_mqtt.hrl").
-export([name/1, text/1]). -export([name/2, text/1]).
-export([compat/2]). -export([compat/2]).
name(I, Ver) when Ver >= ?MQTT_PROTO_V5 ->
name(I);
name(0, _Ver) -> connection_acceptd;
name(1, _Ver) -> unacceptable_protocol_version;
name(2, _Ver) -> client_identifier_not_valid;
name(3, _Ver) -> server_unavaliable;
name(4, _Ver) -> malformed_username_or_password;
name(5, _Ver) -> unauthorized_client;
name(I, _Ver) -> list_to_atom("unkown_connack" ++ integer_to_list(I)).
name(16#00) -> success; name(16#00) -> success;
name(16#01) -> granted_qos1; name(16#01) -> granted_qos1;
name(16#02) -> granted_qos2; name(16#02) -> granted_qos2;
@ -130,4 +140,3 @@ compat(connack, 16#9F) -> ?CONNACK_SERVER;
compat(suback, Code) when Code =< ?QOS2 -> Code; compat(suback, Code) when Code =< ?QOS2 -> Code;
compat(suback, Code) when Code > 16#80 -> 16#80. compat(suback, Code) when Code > 16#80 -> 16#80.

View File

@ -147,6 +147,11 @@
created_at :: erlang:timestamp() created_at :: erlang:timestamp()
}). }).
-type(spid() :: pid()).
-type(attr() :: {atom(), term()}).
-export_type([attr/0]).
-define(TIMEOUT, 60000). -define(TIMEOUT, 60000).
-define(LOG(Level, Format, Args, State), -define(LOG(Level, Format, Args, State),
@ -159,7 +164,7 @@ start_link(SessAttrs) ->
proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]). proc_lib:start_link(?MODULE, init, [[self(), SessAttrs]]).
%% @doc Get session info %% @doc Get session info
-spec(info(pid() | #state{}) -> list({atom(), term()})). -spec(info(spid() | #state{}) -> list({atom(), term()})).
info(SPid) when is_pid(SPid) -> info(SPid) when is_pid(SPid) ->
gen_server:call(SPid, info, infinity); gen_server:call(SPid, info, infinity);
@ -187,7 +192,7 @@ info(State = #state{conn_pid = ConnPid,
{await_rel_timeout, AwaitRelTimeout}]. {await_rel_timeout, AwaitRelTimeout}].
%% @doc Get session attrs %% @doc Get session attrs
-spec(attrs(pid() | #state{}) -> list({atom(), term()})). -spec(attrs(spid() | #state{}) -> list({atom(), term()})).
attrs(SPid) when is_pid(SPid) -> attrs(SPid) when is_pid(SPid) ->
gen_server:call(SPid, attrs, infinity); gen_server:call(SPid, attrs, infinity);
@ -204,7 +209,7 @@ attrs(#state{clean_start = CleanStart,
{expiry_interval, ExpiryInterval div 1000}, {expiry_interval, ExpiryInterval div 1000},
{created_at, CreatedAt}]. {created_at, CreatedAt}].
-spec(stats(pid() | #state{}) -> list({atom(), non_neg_integer()})). -spec(stats(spid() | #state{}) -> list({atom(), non_neg_integer()})).
stats(SPid) when is_pid(SPid) -> stats(SPid) when is_pid(SPid) ->
gen_server:call(SPid, stats, infinity); gen_server:call(SPid, stats, infinity);
@ -233,19 +238,19 @@ stats(#state{max_subscriptions = MaxSubscriptions,
%% PubSub API %% PubSub API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
-spec(subscribe(pid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok). -spec(subscribe(spid(), list({emqx_topic:topic(), emqx_types:subopts()})) -> ok).
subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> subscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts)) TopicFilters = [emqx_topic:parse(RawTopic, maps:merge(?DEFAULT_SUBOPTS, SubOpts))
|| {RawTopic, SubOpts} <- RawTopicFilters], || {RawTopic, SubOpts} <- RawTopicFilters],
subscribe(SPid, undefined, #{}, TopicFilters). subscribe(SPid, undefined, #{}, TopicFilters).
-spec(subscribe(pid(), emqx_mqtt_types:packet_id(), -spec(subscribe(spid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
subscribe(SPid, PacketId, Properties, TopicFilters) -> subscribe(SPid, PacketId, Properties, TopicFilters) ->
SubReq = {PacketId, Properties, TopicFilters}, SubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {subscribe, self(), SubReq}). gen_server:cast(SPid, {subscribe, self(), SubReq}).
-spec(publish(pid(), emqx_mqtt_types:packet_id(), emqx_types:message()) -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-> {ok, emqx_types:deliver_results()}). -> {ok, emqx_types:deliver_results()}).
publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) -> publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
%% Publish QoS0 message to broker directly %% Publish QoS0 message to broker directly
@ -259,56 +264,56 @@ publish(SPid, PacketId, Msg = #message{qos = ?QOS_2}) ->
%% Publish QoS2 message to session %% Publish QoS2 message to session
gen_server:call(SPid, {publish, PacketId, Msg}, infinity). gen_server:call(SPid, {publish, PacketId, Msg}, infinity).
-spec(puback(pid(), emqx_mqtt_types:packet_id()) -> ok). -spec(puback(spid(), emqx_mqtt_types:packet_id()) -> ok).
puback(SPid, PacketId) -> puback(SPid, PacketId) ->
gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}). gen_server:cast(SPid, {puback, PacketId, ?RC_SUCCESS}).
puback(SPid, PacketId, ReasonCode) -> puback(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {puback, PacketId, ReasonCode}). gen_server:cast(SPid, {puback, PacketId, ReasonCode}).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}). -spec(pubrec(spid(), emqx_mqtt_types:packet_id()) -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId) -> pubrec(SPid, PacketId) ->
pubrec(SPid, PacketId, ?RC_SUCCESS). pubrec(SPid, PacketId, ?RC_SUCCESS).
-spec(pubrec(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -spec(pubrec(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}). -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrec(SPid, PacketId, ReasonCode) -> pubrec(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity). gen_server:call(SPid, {pubrec, PacketId, ReasonCode}, infinity).
-spec(pubrel(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -spec(pubrel(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code())
-> ok | {error, emqx_mqtt_types:reason_code()}). -> ok | {error, emqx_mqtt_types:reason_code()}).
pubrel(SPid, PacketId, ReasonCode) -> pubrel(SPid, PacketId, ReasonCode) ->
gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity). gen_server:call(SPid, {pubrel, PacketId, ReasonCode}, infinity).
-spec(pubcomp(pid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok). -spec(pubcomp(spid(), emqx_mqtt_types:packet_id(), emqx_mqtt_types:reason_code()) -> ok).
pubcomp(SPid, PacketId, ReasonCode) -> pubcomp(SPid, PacketId, ReasonCode) ->
gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}). gen_server:cast(SPid, {pubcomp, PacketId, ReasonCode}).
-spec(unsubscribe(pid(), emqx_types:topic_table()) -> ok). -spec(unsubscribe(spid(), emqx_types:topic_table()) -> ok).
unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) -> unsubscribe(SPid, RawTopicFilters) when is_list(RawTopicFilters) ->
TopicFilters = lists:map(fun({RawTopic, Opts}) -> TopicFilters = lists:map(fun({RawTopic, Opts}) ->
emqx_topic:parse(RawTopic, Opts); emqx_topic:parse(RawTopic, Opts);
(RawTopic) -> (RawTopic) when is_binary(RawTopic) ->
emqx_topic:parse(RawTopic) emqx_topic:parse(RawTopic)
end, RawTopicFilters), end, RawTopicFilters),
unsubscribe(SPid, undefined, #{}, TopicFilters). unsubscribe(SPid, undefined, #{}, TopicFilters).
-spec(unsubscribe(pid(), emqx_mqtt_types:packet_id(), -spec(unsubscribe(spid(), emqx_mqtt_types:packet_id(),
emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok). emqx_mqtt_types:properties(), emqx_mqtt_types:topic_filters()) -> ok).
unsubscribe(SPid, PacketId, Properties, TopicFilters) -> unsubscribe(SPid, PacketId, Properties, TopicFilters) ->
UnsubReq = {PacketId, Properties, TopicFilters}, UnsubReq = {PacketId, Properties, TopicFilters},
gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}). gen_server:cast(SPid, {unsubscribe, self(), UnsubReq}).
-spec(resume(pid(), pid()) -> ok). -spec(resume(spid(), pid()) -> ok).
resume(SPid, ConnPid) -> resume(SPid, ConnPid) ->
gen_server:cast(SPid, {resume, ConnPid}). gen_server:cast(SPid, {resume, ConnPid}).
%% @doc Discard the session %% @doc Discard the session
-spec(discard(pid(), emqx_types:client_id()) -> ok). -spec(discard(spid(), ByPid :: pid()) -> ok).
discard(SPid, ClientId) -> discard(SPid, ByPid) ->
gen_server:call(SPid, {discard, ClientId}, infinity). gen_server:call(SPid, {discard, ByPid}, infinity).
-spec(close(pid()) -> ok). -spec(close(spid()) -> ok).
close(SPid) -> close(SPid) ->
gen_server:call(SPid, close, infinity). gen_server:call(SPid, close, infinity).
@ -367,16 +372,26 @@ init_mqueue(Zone) ->
binding(ConnPid) -> binding(ConnPid) ->
case node(ConnPid) =:= node() of true -> local; false -> remote end. case node(ConnPid) =:= node() of true -> local; false -> remote end.
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = undefined}) -> handle_call(info, _From, State) ->
?LOG(warning, "Discarded by ~p", [ConnPid], State), reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid], State),
{stop, {shutdown, discard}, ok, State}; {stop, {shutdown, discard}, ok, State};
handle_call({discard, ConnPid}, _From, State = #state{conn_pid = OldConnPid}) -> handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, " ~p kickout ~p", [ConnPid, OldConnPid], State), ?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State),
{stop, {shutdown, conflict}, ok, State}; ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discard}, ok, State};
%% PUBLISH: %% PUBLISH:
handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From, handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2, timestamp = Ts}}, _From,
State = #state{awaiting_rel = AwaitingRel}) -> State = #state{awaiting_rel = AwaitingRel}) ->
reply(case is_awaiting_full(State) of reply(case is_awaiting_full(State) of
false -> false ->
@ -384,13 +399,12 @@ handle_call({publish, PacketId, Msg = #message{qos = ?QOS_2}}, _From,
true -> true ->
{{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State}; {{error, ?RC_PACKET_IDENTIFIER_IN_USE}, State};
false -> false ->
State1 = State#state{awaiting_rel = maps:put(PacketId, Msg, AwaitingRel)}, State1 = State#state{awaiting_rel = maps:put(PacketId, Ts, AwaitingRel)},
{emqx_broker:publish(Msg), ensure_await_rel_timer(State1)} {emqx_broker:publish(Msg), ensure_await_rel_timer(State1)}
end; end;
true -> true ->
emqx_metrics:inc('messages/qos2/dropped'), emqx_metrics:inc('messages/qos2/dropped'),
?LOG(warning, "Dropped message for too many awaiting_rel: ~p", ?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
[emqx_message:format(Msg)], State),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State} {{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end); end);
@ -408,7 +422,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
%% PUBREL: %% PUBREL:
handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) -> handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel = AwaitingRel}) ->
reply(case maps:take(PacketId, AwaitingRel) of reply(case maps:take(PacketId, AwaitingRel) of
{_, AwaitingRel1} -> {_Ts, AwaitingRel1} ->
{ok, State#state{awaiting_rel = AwaitingRel1}}; {ok, State#state{awaiting_rel = AwaitingRel1}};
error -> error ->
emqx_metrics:inc('packets/pubrel/missed'), emqx_metrics:inc('packets/pubrel/missed'),
@ -416,15 +430,6 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State} {{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end); end);
handle_call(info, _From, State) ->
reply(info(State), State);
handle_call(attrs, _From, State) ->
reply(attrs(State), State);
handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call(close, _From, State) -> handle_call(close, _From, State) ->
{stop, normal, ok, State}; {stop, normal, ok, State};
@ -442,6 +447,7 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
SubMap; SubMap;
{ok, _SubOpts} -> {ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts), emqx_broker:set_subopts(Topic, {self(), ClientId}, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]), emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts]),
maps:put(Topic, SubOpts, SubMap); maps:put(Topic, SubOpts, SubMap);
error -> error ->
@ -561,7 +567,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined})); noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) -> handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
true = emqx_sm:set_session_stats(ClientId, stats(State)), _ = emqx_sm:set_session_stats(ClientId, stats(State)),
{noreply, State#state{stats_timer = undefined}, hibernate}; {noreply, State#state{stats_timer = undefined}, hibernate};
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) -> handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
@ -618,17 +624,17 @@ unsuback(From, PacketId, ReasonCodes) ->
From ! {deliver, {unsuback, PacketId, ReasonCodes}}. From ! {deliver, {unsuback, PacketId, ReasonCodes}}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Kickout old client %% Kickout old connection
kick(_ClientId, undefined, _Pid) -> kick(_ClientId, undefined, _ConnPid) ->
ignore; ignore;
kick(_ClientId, Pid, Pid) -> kick(_ClientId, ConnPid, ConnPid) ->
ignore; ignore;
kick(ClientId, OldPid, Pid) -> kick(ClientId, OldConnPid, ConnPid) ->
unlink(OldPid), unlink(OldConnPid),
OldPid ! {shutdown, conflict, {ClientId, Pid}}, OldConnPid ! {shutdown, conflict, {ClientId, ConnPid}},
%% Clean noproc %% Clean noproc
receive {'EXIT', OldPid, _} -> ok after 1 -> ok end. receive {'EXIT', OldConnPid, _} -> ok after 1 -> ok end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% Replay or Retry Delivery %% Replay or Retry Delivery
@ -639,8 +645,9 @@ retry_delivery(Force, State = #state{inflight = Inflight}) ->
case emqx_inflight:is_empty(Inflight) of case emqx_inflight:is_empty(Inflight) of
true -> State; true -> State;
false -> false ->
InflightMsgs = lists:sort(sortfun(inflight), emqx_inflight:values(Inflight)), SortFun = fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end,
retry_delivery(Force, InflightMsgs, os:timestamp(), State) Msgs = lists:sort(SortFun, emqx_inflight:values(Inflight)),
retry_delivery(Force, Msgs, os:timestamp(), State)
end. end.
retry_delivery(_Force, [], _Now, State) -> retry_delivery(_Force, [], _Now, State) ->
@ -650,9 +657,9 @@ retry_delivery(_Force, [], _Now, State) ->
retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now, retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
State = #state{inflight = Inflight, retry_interval = Interval}) -> State = #state{inflight = Inflight, retry_interval = Interval}) ->
%% Microseconds -> MilliSeconds %% Microseconds -> MilliSeconds
Diff = timer:now_diff(Now, Ts) div 1000, Age = timer:now_diff(Now, Ts) div 1000,
if if
Force orelse (Diff >= Interval) -> Force orelse (Age >= Interval) ->
Inflight1 = case {Type, Msg0} of Inflight1 = case {Type, Msg0} of
{publish, {PacketId, Msg}} -> {publish, {PacketId, Msg}} ->
case emqx_message:is_expired(Msg) of case emqx_message:is_expired(Msg) of
@ -669,7 +676,7 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
end, end,
retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1}); retry_delivery(Force, Msgs, Now, State#state{inflight = Inflight1});
true -> true ->
ensure_retry_timer(Interval - Diff, State) ensure_retry_timer(Interval - max(0, Age), State)
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -679,36 +686,21 @@ retry_delivery(Force, [{Type, Msg0, Ts} | Msgs], Now,
expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) -> expire_awaiting_rel(State = #state{awaiting_rel = AwaitingRel}) ->
case maps:size(AwaitingRel) of case maps:size(AwaitingRel) of
0 -> State; 0 -> State;
_ -> Msgs = lists:sort(sortfun(awaiting_rel), maps:to_list(AwaitingRel)), _ -> expire_awaiting_rel(lists:keysort(2, maps:to_list(AwaitingRel)), os:timestamp(), State)
expire_awaiting_rel(Msgs, os:timestamp(), State)
end. end.
expire_awaiting_rel([], _Now, State) -> expire_awaiting_rel([], _Now, State) ->
State#state{await_rel_timer = undefined}; State#state{await_rel_timer = undefined};
expire_awaiting_rel([{PacketId, Msg = #message{timestamp = TS}} | Msgs], Now, expire_awaiting_rel([{PacketId, Ts} | More], Now,
State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) -> State = #state{awaiting_rel = AwaitingRel, await_rel_timeout = Timeout}) ->
case (timer:now_diff(Now, TS) div 1000) of case (timer:now_diff(Now, Ts) div 1000) of
Diff when Diff >= Timeout -> Age when Age >= Timeout ->
emqx_metrics:inc('messages/qos2/dropped'), emqx_metrics:inc('messages/qos2/expired'),
?LOG(warning, "Dropped message for await_rel_timeout: ~p", ?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
[emqx_message:format(Msg)], State), expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
expire_awaiting_rel(Msgs, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)}); Age ->
Diff -> ensure_await_rel_timer(Timeout - max(0, Age), State)
ensure_await_rel_timer(Timeout - Diff, State)
end.
%%------------------------------------------------------------------------------
%% Sort Inflight, AwaitingRel
%%------------------------------------------------------------------------------
sortfun(inflight) ->
fun({_, _, Ts1}, {_, _, Ts2}) -> Ts1 < Ts2 end;
sortfun(awaiting_rel) ->
fun({_, #message{timestamp = Ts1}},
{_, #message{timestamp = Ts2}}) ->
Ts1 < Ts2
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -728,7 +720,7 @@ run_dispatch_steps([], Msg, State) ->
dispatch(Msg, State); dispatch(Msg, State);
run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) -> run_dispatch_steps([{nl, 1}|_Steps], #message{from = ClientId}, State = #state{client_id = ClientId}) ->
State; State;
run_dispatch_steps([{nl, 0}|Steps], Msg, State) -> run_dispatch_steps([{nl, _}|Steps], Msg, State) ->
run_dispatch_steps(Steps, Msg, State); run_dispatch_steps(Steps, Msg, State);
run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) -> run_dispatch_steps([{qos, SubQoS}|Steps], Msg = #message{qos = PubQoS}, State = #state{upgrade_qos = false}) ->
run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State); run_dispatch_steps(Steps, Msg#message{qos = min(SubQoS, PubQoS)}, State);
@ -905,4 +897,3 @@ shutdown(Reason, State) ->
%% TODO: GC Policy and Shutdown Policy %% TODO: GC Policy and Shutdown Policy
%% maybe_gc(State) -> State. %% maybe_gc(State) -> State.

View File

@ -49,22 +49,20 @@ start_link() ->
%% @doc Open a session. %% @doc Open a session.
-spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}). -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) -> open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
CleanStart = fun(_) -> CleanStart = fun(_) ->
ok = discard_session(ClientId, ConnPid), ok = discard_session(ClientId, ConnPid),
emqx_session_sup:start_session(Attrs) emqx_session_sup:start_session(SessAttrs)
end, end,
emqx_sm_locker:trans(ClientId, CleanStart); emqx_sm_locker:trans(ClientId, CleanStart);
open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) -> open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
ResumeStart = fun(_) -> ResumeStart = fun(_) ->
case resume_session(ClientId, ConnPid) of case resume_session(ClientId, ConnPid) of
{ok, SPid} -> {ok, SPid} ->
{ok, SPid, true}; {ok, SPid, true};
{error, not_found} -> {error, not_found} ->
emqx_session_sup:start_session(Attrs); emqx_session_sup:start_session(SessAttrs)
{error, Reason} ->
{error, Reason}
end end
end, end,
emqx_sm_locker:trans(ClientId, ResumeStart). emqx_sm_locker:trans(ClientId, ResumeStart).
@ -113,31 +111,31 @@ close_session(SPid) when is_pid(SPid) ->
%% @doc Register a session with attributes. %% @doc Register a session with attributes.
-spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}, -spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()},
list(emqx_session:attribute())) -> ok). list(emqx_session:attr())) -> ok).
register_session(ClientId, Attrs) when is_binary(ClientId) -> register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
register_session({ClientId, self()}, Attrs); register_session({ClientId, self()}, SessAttrs);
register_session(Session = {ClientId, SPid}, Attrs) register_session(Session = {ClientId, SPid}, SessAttrs)
when is_binary(ClientId), is_pid(SPid) -> when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_TAB, Session), ets:insert(?SESSION_TAB, Session),
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}), ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
case proplists:get_value(clean_start, Attrs, true) of proplists:get_value(clean_start, SessAttrs, true)
true -> ok; andalso ets:insert(?SESSION_P_TAB, Session),
false -> ets:insert(?SESSION_P_TAB, Session)
end,
emqx_sm_registry:register_session(Session), emqx_sm_registry:register_session(Session),
notify({registered, ClientId, SPid}). notify({registered, ClientId, SPid}).
%% @doc Get session attrs %% @doc Get session attrs
-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())). -spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) -> get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_ATTRS_TAB, Session, []). safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
%% @doc Set session attrs %% @doc Set session attrs
set_session_attrs(ClientId, Attrs) when is_binary(ClientId) -> -spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
set_session_attrs({ClientId, self()}, Attrs); list(emqx_session:attr())) -> true).
set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) -> set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) ->
ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}). set_session_attrs({ClientId, self()}, SessAttrs);
set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}).
%% @doc Unregister a session %% @doc Unregister a session
-spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok). -spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
@ -154,18 +152,15 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(
%% @doc Get session stats %% @doc Get session stats
-spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())). -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
get_session_stats(Session = {ClientId, SPid}) get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
when is_binary(ClientId), is_pid(SPid) ->
safe_lookup_element(?SESSION_STATS_TAB, Session, []). safe_lookup_element(?SESSION_STATS_TAB, Session, []).
%% @doc Set session stats %% @doc Set session stats
-spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()}, -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
emqx_stats:stats()) -> ok). emqx_stats:stats()) -> true).
set_session_stats(ClientId, Stats) when is_binary(ClientId) -> set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
set_session_stats({ClientId, self()}, Stats); set_session_stats({ClientId, self()}, Stats);
set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) ->
set_session_stats(Session = {ClientId, SPid}, Stats)
when is_binary(ClientId), is_pid(SPid) ->
ets:insert(?SESSION_STATS_TAB, {Session, Stats}). ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
%% @doc Lookup a session from registry %% @doc Lookup a session from registry

View File

@ -20,7 +20,9 @@
-export([start_link/0]). -export([start_link/0]).
-export([is_enabled/0]). -export([is_enabled/0]).
-export([register_session/1, lookup_session/1, unregister_session/1]). -export([register_session/1, lookup_session/1, unregister_session/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]). code_change/3]).
@ -30,12 +32,11 @@
-define(LOCK, {?MODULE, cleanup_sessions}). -define(LOCK, {?MODULE, cleanup_sessions}).
-record(global_session, {sid, pid}). -record(global_session, {sid, pid}).
-record(state, {}).
-type(session_pid() :: pid()). -type(session_pid() :: pid()).
%% @doc Start the session manager. %% @doc Start the global session manager.
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}). -spec(start_link() -> emqx_types:startlink_ret()).
start_link() -> start_link() ->
gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []). gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
@ -46,19 +47,18 @@ is_enabled() ->
-spec(lookup_session(emqx_types:client_id()) -spec(lookup_session(emqx_types:client_id())
-> list({emqx_types:client_id(), session_pid()})). -> list({emqx_types:client_id(), session_pid()})).
lookup_session(ClientId) -> lookup_session(ClientId) ->
[{ClientId, SessionPid} || #global_session{pid = SessionPid} [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
<- mnesia:dirty_read(?TAB, ClientId)].
-spec(register_session({emqx_types:client_id(), session_pid()}) -> ok). -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
mnesia:dirty_write(?TAB, record(ClientId, SessionPid)). mnesia:dirty_write(?TAB, record(ClientId, SessPid)).
-spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok). -spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok).
unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) -> unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)). mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)).
record(ClientId, SessionPid) -> record(ClientId, SessPid) ->
#global_session{sid = ClientId, pid = SessionPid}. #global_session{sid = ClientId, pid = SessPid}.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% gen_server callbacks %% gen_server callbacks
@ -72,7 +72,7 @@ init([]) ->
{attributes, record_info(fields, global_session)}]), {attributes, record_info(fields, global_session)}]),
ok = ekka_mnesia:copy_table(?TAB), ok = ekka_mnesia:copy_table(?TAB),
ekka:monitor(membership), ekka:monitor(membership),
{ok, #state{}}. {ok, #{}}.
handle_call(Req, _From, State) -> handle_call(Req, _From, State) ->
emqx_logger:error("[Registry] unexpected call: ~p", [Req]), emqx_logger:error("[Registry] unexpected call: ~p", [Req]),
@ -107,9 +107,9 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
cleanup_sessions(Node) -> cleanup_sessions(Node) ->
Pat = [{#global_session{pid = '$1', _ = '_'}, Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
[{'==', {node, '$1'}, Node}], ['$_']}], lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)).
lists:foreach(fun(Session) ->
mnesia:delete_object(?TAB, Session) delete_session(Session) ->
end, mnesia:select(?TAB, Pat)). mnesia:delete_object(?TAB, Session, write).

View File

@ -31,9 +31,10 @@
code_change/3]). code_change/3]).
-record(update, {name, countdown, interval, func}). -record(update, {name, countdown, interval, func}).
-record(state, {timer, updates :: #update{}}). -record(state, {timer, updates :: [#update{}]}).
-type(stats() :: list({atom(), non_neg_integer()})). -type(stats() :: list({atom(), non_neg_integer()})).
-export_type([stats/0]). -export_type([stats/0]).
%% Connection stats %% Connection stats

View File

@ -34,43 +34,67 @@
options, options,
peername, peername,
sockname, sockname,
idle_timeout,
proto_state, proto_state,
parser_state, parser_state,
keepalive, keepalive,
enable_stats, enable_stats,
stats_timer, stats_timer,
idle_timeout, shutdown
shutdown_reason
}). }).
-define(INFO_KEYS, [peername, sockname]).
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(WSLOG(Level, Format, Args, State), -define(WSLOG(Level, Format, Args, State),
emqx_logger:Level("WsClient(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])). emqx_logger:Level("MQTT/WS(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% API %% API
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% for debug %% for debug
info(WSPid) -> info(WSPid) when is_pid(WSPid) ->
call(WSPid, info). call(WSPid, info);
info(#state{peername = Peername,
sockname = Sockname,
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket},
{conn_state, running},
{peername, Peername},
{sockname, Sockname}],
lists:append([ConnInfo, ProtoInfo]).
%% for dashboard %% for dashboard
attrs(CPid) when is_pid(CPid) -> attrs(WSPid) when is_pid(WSPid) ->
call(CPid, attrs). call(WSPid, attrs);
stats(WSPid) -> attrs(#state{peername = Peername,
call(WSPid, stats). sockname = Sockname,
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
lists:usort(lists:append(SockAttrs, ProtoAttrs)).
kick(WSPid) -> stats(WSPid) when is_pid(WSPid) ->
call(WSPid, stats);
stats(#state{proto_state = ProtoState}) ->
lists:append([wsock_stats(),
emqx_misc:proc_stats(),
emqx_protocol:stats(ProtoState)
]).
kick(WSPid) when is_pid(WSPid) ->
call(WSPid, kick). call(WSPid, kick).
session(WSPid) -> session(WSPid) when is_pid(WSPid) ->
call(WSPid, session). call(WSPid, session).
call(WSPid, Req) -> call(WSPid, Req) when is_pid(WSPid) ->
Mref = erlang:monitor(process, WSPid), Mref = erlang:monitor(process, WSPid),
WSPid ! {call, {self(), Mref}, Req}, WSPid ! {call, {self(), Mref}, Req},
receive receive
@ -152,41 +176,30 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1})); websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
{error, Error} -> {error, Error} ->
?WSLOG(error, "Protocol error - ~p", [Error], State), ?WSLOG(error, "Protocol error - ~p", [Error], State),
{stop, State}; stop(Error, State);
{error, Error, ProtoState1} -> {error, Reason, ProtoState1} ->
shutdown(Error, State#state{proto_state = ProtoState1}); shutdown(Reason, State#state{proto_state = ProtoState1});
{stop, Reason, ProtoState1} -> {stop, Error, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1}) stop(Error, State#state{proto_state = ProtoState1})
end; end;
{error, Error} -> {error, Error} ->
?WSLOG(error, "Frame error: ~p", [Error], State), ?WSLOG(error, "Frame error: ~p", [Error], State),
{stop, State}; stop(Error, State);
{'EXIT', Reason} -> {'EXIT', Reason} ->
?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State), ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State),
{stop, State} shutdown(parse_error, State)
end. end.
websocket_info({call, From, info}, State = #state{peername = Peername, websocket_info({call, From, info}, State) ->
sockname = Sockname, gen_server:reply(From, info(State)),
proto_state = ProtoState}) ->
ProtoInfo = emqx_protocol:info(ProtoState),
ConnInfo = [{socktype, websocket}, {conn_state, running},
{peername, Peername}, {sockname, Sockname}],
gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])),
{ok, State}; {ok, State};
websocket_info({call, From, attrs}, State = #state{peername = Peername, websocket_info({call, From, attrs}, State) ->
sockname = Sockname, gen_server:reply(From, attrs(State)),
proto_state = ProtoState}) ->
SockAttrs = [{peername, Peername},
{sockname, Sockname}],
ProtoAttrs = emqx_protocol:attrs(ProtoState),
gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))),
{ok, State}; {ok, State};
websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) -> websocket_info({call, From, stats}, State) ->
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]), gen_server:reply(From, stats(State)),
gen_server:reply(From, Stats),
{ok, State}; {ok, State};
websocket_info({call, From, kick}, State) -> websocket_info({call, From, kick}, State) ->
@ -202,15 +215,12 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
{ok, ProtoState1} -> {ok, ProtoState1} ->
{ok, ensure_stats_timer(State#state{proto_state = ProtoState1})}; {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})};
{error, Reason} -> {error, Reason} ->
shutdown(Reason, State); shutdown(Reason, State)
{error, Reason, ProtoState1} ->
shutdown(Reason, State#state{proto_state = ProtoState1})
end; end;
websocket_info(emit_stats, State = #state{proto_state = ProtoState}) -> websocket_info({timeout, Timer, emit_stats},
Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
emqx_protocol:stats(ProtoState)]), emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
{ok, State#state{stats_timer = undefined}, hibernate}; {ok, State#state{stats_timer = undefined}, hibernate};
websocket_info({keepalive, start, Interval}, State) -> websocket_info({keepalive, start, Interval}, State) ->
@ -235,6 +245,10 @@ websocket_info({keepalive, check}, State = #state{keepalive = KeepAlive}) ->
shutdown(keepalive_error, State) shutdown(keepalive_error, State)
end; end;
websocket_info({shutdown, discard, {ClientId, ByPid}}, State) ->
?WSLOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
shutdown(discard, State);
websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) -> websocket_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State), ?WSLOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid], State),
shutdown(conflict, State); shutdown(conflict, State);
@ -249,30 +263,40 @@ websocket_info(Info, State) ->
?WSLOG(error, "unexpected info: ~p", [Info], State), ?WSLOG(error, "unexpected info: ~p", [Info], State),
{ok, State}. {ok, State}.
terminate(SockError, _Req, #state{keepalive = Keepalive, terminate(SockError, _Req, State = #state{keepalive = Keepalive,
proto_state = ProtoState, proto_state = ProtoState,
shutdown_reason = Reason}) -> shutdown = Shutdown}) ->
?WSLOG(debug, "Terminated for ~p, sockerror: ~p",
[Shutdown, SockError], State),
emqx_keepalive:cancel(Keepalive), emqx_keepalive:cancel(Keepalive),
io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]), case {ProtoState, Shutdown} of
case Reason of {undefined, _} -> ok;
undefined -> {_, {shutdown, Reason}} ->
ok; emqx_protocol:shutdown(Reason, ProtoState);
_ -> {_, Error} ->
emqx_protocol:shutdown(Reason, ProtoState) emqx_protocol:shutdown(Error, ProtoState)
end. end.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
reset_parser(State = #state{proto_state = ProtoState}) -> reset_parser(State = #state{proto_state = ProtoState}) ->
State#state{parser_state = emqx_protocol:parser(ProtoState)}. State#state{parser_state = emqx_protocol:parser(ProtoState)}.
ensure_stats_timer(State = #state{enable_stats = true, ensure_stats_timer(State = #state{enable_stats = true,
stats_timer = undefined, stats_timer = undefined,
idle_timeout = Timeout}) -> idle_timeout = IdleTimeout}) ->
State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)}; State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
ensure_stats_timer(State) -> ensure_stats_timer(State) ->
State. State.
shutdown(Reason, State) -> shutdown(Reason, State) ->
{stop, State#state{shutdown_reason = Reason}}. {stop, State#state{shutdown = Reason}}.
stop(Error, State) ->
{stop, State#state{shutdown = Error}}.
wsock_stats() -> wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS]. [{Key, get(Key)} || Key <- ?SOCK_STATS].

View File

@ -98,7 +98,8 @@ end_per_group(_Group, Config) ->
Config. Config.
init_per_testcase(_TestCase, Config) -> init_per_testcase(_TestCase, Config) ->
{ok, _Pid} = ?AC:start_link(), %% {ok, _Pid} =
?AC:start_link(),
Config. Config.
end_per_testcase(_TestCase, _Config) -> end_per_testcase(_TestCase, _Config) ->
ok. ok.

View File

@ -62,12 +62,12 @@ end_per_suite(_Config) ->
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
subscribe_unsubscribe(_) -> subscribe_unsubscribe(_) ->
ok = emqx:subscribe(<<"topic">>, "clientId"), ok = emqx:subscribe(<<"topic">>, <<"clientId">>),
ok = emqx:subscribe(<<"topic/1">>, "clientId", #{ qos => 1 }), ok = emqx:subscribe(<<"topic/1">>, <<"clientId">>, #{ qos => 1 }),
ok = emqx:subscribe(<<"topic/2">>, "clientId", #{ qos => 2 }), ok = emqx:subscribe(<<"topic/2">>, <<"clientId">>, #{ qos => 2 }),
ok = emqx:unsubscribe(<<"topic">>, "clientId"), ok = emqx:unsubscribe(<<"topic">>, <<"clientId">>),
ok = emqx:unsubscribe(<<"topic/1">>, "clientId"), ok = emqx:unsubscribe(<<"topic/1">>, <<"clientId">>),
ok = emqx:unsubscribe(<<"topic/2">>, "clientId"). ok = emqx:unsubscribe(<<"topic/2">>, <<"clientId">>).
publish(_) -> publish(_) ->
Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>), Msg = emqx_message:make(ct, <<"test/pubsub">>, <<"hello">>),
@ -79,9 +79,9 @@ publish(_) ->
pubsub(_) -> pubsub(_) ->
Self = self(), Self = self(),
Subscriber = {Self, <<"clientId">>}, Subscriber = {Self, <<"clientId">>},
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 1 }), ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 1 }),
#{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), #{ qos := 1} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
ok = emqx:subscribe(<<"a/b/c">>, Subscriber, #{ qos => 2 }), ok = emqx:subscribe(<<"a/b/c">>, <<"clientId">>, #{ qos => 2 }),
#{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2), #{ qos := 2} = ets:lookup_element(emqx_suboption, {<<"a/b/c">>, Subscriber}, 2),
%% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]), %% ct:log("Emq Sub: ~p.~n", [ets:lookup(emqx_suboption, {<<"a/b/c">>, Subscriber})]),
timer:sleep(10), timer:sleep(10),
@ -100,8 +100,8 @@ pubsub(_) ->
t_local_subscribe(_) -> t_local_subscribe(_) ->
ok = emqx:subscribe(<<"$local/topic0">>), ok = emqx:subscribe(<<"$local/topic0">>),
ok = emqx:subscribe(<<"$local/topic1">>, "clientId"), ok = emqx:subscribe(<<"$local/topic1">>, <<"clientId">>),
ok = emqx:subscribe(<<"$local/topic2">>, "clientId", #{ qos => 2 }), ok = emqx:subscribe(<<"$local/topic2">>, <<"clientId">>, #{ qos => 2 }),
timer:sleep(10), timer:sleep(10),
?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")), ?assertEqual([{self(), undefined}], emqx:subscribers("$local/topic0")),
?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")), ?assertEqual([{self(), <<"clientId">>}], emqx:subscribers("$local/topic1")),
@ -110,8 +110,8 @@ t_local_subscribe(_) ->
emqx:subscriptions({self(), <<"clientId">>})), emqx:subscriptions({self(), <<"clientId">>})),
?assertEqual(ok, emqx:unsubscribe("$local/topic0")), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
?assertEqual(ok, emqx:unsubscribe("$local/topic0")), ?assertEqual(ok, emqx:unsubscribe("$local/topic0")),
?assertEqual(ok, emqx:unsubscribe("$local/topic1", "clientId")), ?assertEqual(ok, emqx:unsubscribe("$local/topic1", <<"clientId">>)),
?assertEqual(ok, emqx:unsubscribe("$local/topic2", "clientId")), ?assertEqual(ok, emqx:unsubscribe("$local/topic2", <<"clientId">>)),
?assertEqual([], emqx:subscribers("topic1")), ?assertEqual([], emqx:subscribers("topic1")),
?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})). ?assertEqual([], emqx:subscriptions({self(), <<"clientId">>})).

View File

@ -1,42 +0,0 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_client_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [{group, connect}].
groups() -> [{connect, [start]}].
init_per_suite(Config) ->
Config.
end_per_suite(_Config) ->
ok.
init_per_group(_Group, Config) ->
Config.
end_per_group(_Group, _Config) ->
ok.
start(_Config) ->
{ok, ClientPid, _} = emqx_client:start_link().

View File

@ -19,12 +19,29 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
all() -> [t_attrs]. all() ->
[{group, connection}].
groups() ->
[{connection, [sequence], [t_attrs]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_attrs(_) -> t_attrs(_) ->
emqx_ct_broker_helpers:run_setup_steps(),
{ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]), {ok, C, _} = emqx_client:start_link([{host, "localhost"}, {client_id, <<"simpleClient">>}, {username, <<"plain">>}, {password, <<"plain">>}]),
[{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>), [{<<"simpleClient">>, ConnPid}] = emqx_cm:lookup_connection(<<"simpleClient">>),
Attrs = emqx_connection:attrs(ConnPid), Attrs = emqx_connection:attrs(ConnPid),
<<"simpleClient">> = proplists:get_value(client_id, Attrs), <<"simpleClient">> = proplists:get_value(client_id, Attrs),
<<"plain">> = proplists:get_value(username, Attrs). <<"plain">> = proplists:get_value(username, Attrs),
emqx_client:disconnect(C).
%% t_stats() ->
%% {ok, C, _ } = emqx_client;
%% t_stats() ->

View File

@ -0,0 +1,75 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_listeners_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
all() ->
[start_stop_listeners,
restart_listeners].
init_per_suite(Config) ->
NewConfig = generate_config(),
application:ensure_all_started(esockd),
lists:foreach(fun set_app_env/1, NewConfig),
Config.
end_per_suite(_Config) ->
application:stop(esockd).
start_stop_listeners(_) ->
ok = emqx_listeners:start(),
ok = emqx_listeners:stop().
restart_listeners(_) ->
ok = emqx_listeners:start(),
ok = emqx_listeners:stop(),
ok = emqx_listeners:restart(),
ok = emqx_listeners:stop().
generate_config() ->
Schema = cuttlefish_schema:files([local_path(["priv", "emqx.schema"])]),
Conf = conf_parse:file([local_path(["etc", "emqx.conf"])]),
cuttlefish_generator:map(Schema, Conf).
set_app_env({App, Lists}) ->
lists:foreach(fun({acl_file, _Var}) ->
application:set_env(App, acl_file, local_path(["etc", "acl.conf"]));
({plugins_loaded_file, _Var}) ->
application:set_env(App, plugins_loaded_file, local_path(["test", "emqx_SUITE_data","loaded_plugins"]));
({Par, Var}) ->
application:set_env(App, Par, Var)
end, Lists).
local_path(Components, Module) ->
filename:join([get_base_dir(Module) | Components]).
local_path(Components) ->
local_path(Components, ?MODULE).
get_base_dir(Module) ->
{file, Here} = code:is_loaded(Module),
filename:dirname(filename:dirname(Here)).
get_base_dir() ->
get_base_dir(?MODULE).

View File

@ -33,14 +33,13 @@
all() -> all() ->
[basic_test, [basic_test,
retained_message_test,
will_message_test, will_message_test,
zero_length_clientid_test, zero_length_clientid_test,
offline_message_queueing_test, offline_message_queueing_test,
overlapping_subscriptions_test, overlapping_subscriptions_test,
keepalive_test, %% keepalive_test,
redelivery_on_reconnect_test, redelivery_on_reconnect_test,
subscribe_failure_test, %% subscribe_failure_test,
dollar_topics_test]. dollar_topics_test].
init_per_suite(Config) -> init_per_suite(Config) ->
@ -57,9 +56,10 @@ receive_messages(0, Msgs) ->
Msgs; Msgs;
receive_messages(Count, Msgs) -> receive_messages(Count, Msgs) ->
receive receive
{public, Msg} -> {publish, Msg} ->
receive_messages(Count-1, [Msg|Msgs]); receive_messages(Count-1, [Msg|Msgs]);
_Other -> Other ->
ct:log("~p~n", [Other]),
receive_messages(Count, Msgs) receive_messages(Count, Msgs)
after 10 -> after 10 ->
Msgs Msgs
@ -69,40 +69,16 @@ basic_test(_Config) ->
Topic = nth(1, ?TOPICS), Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"), ct:print("Basic test starting"),
{ok, C, _} = emqx_client:start_link(), {ok, C, _} = emqx_client:start_link(),
{ok, _, [0]} = emqx_client:subscribe(C, Topic, qos2), {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
ok = emqx_client:publish(C, Topic, <<"qos 0">>),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
ok = emqx_client:disconnect(C), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
?assertEqual(3, length(receive_messages(3))). {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
?assertEqual(3, length(receive_messages(3))),
retained_message_test(_Config) -> ok = emqx_client:disconnect(C).
ct:print("Retained message test starting"),
%% Retained messages
{ok, C1, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C1, nth(1, ?TOPICS), <<"qos 0">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(3, ?TOPICS), <<"qos 1">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<"qos 2">>, [{qos, 2}, {retain, true}]),
timer:sleep(10),
{ok, #{}, [0]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
?assertEqual(3, length(receive_messages(10))),
%% Clear retained messages
{ok, C2, _} = emqx_client:start_link([{clean_start, true}]),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"">>, [{qos, 0}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"">>, [{qos, 1}, {retain, true}]),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"">>, [{qos, 2}, {retain, true}]),
timer:sleep(10), %% wait for QoS 2 exchange to be completed
{ok, _, [0]} = emqx_client:subscribe(C2, nth(6, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:disconnect(),
?assertEqual(0, length(receive_messages(3))).
will_message_test(_Config) -> will_message_test(_Config) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, true}, {ok, C1, _} = emqx_client:start_link([{clean_start, true},
{will_topic = nth(3, ?TOPICS)}, {will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>}, {will_payload, <<"client disconnected">>},
{keepalive, 2}]), {keepalive, 2}]),
{ok, C2, _} = emqx_client:start_link(), {ok, C2, _} = emqx_client:start_link(),
@ -110,14 +86,18 @@ will_message_test(_Config) ->
timer:sleep(10), timer:sleep(10),
ok = emqx_client:stop(C1), ok = emqx_client:stop(C1),
timer:sleep(5), timer:sleep(5),
ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))), ?assertEqual(1, length(receive_messages(1))),
ok = emqx_client:disconnect(C2),
ct:print("Will message test succeeded"). ct:print("Will message test succeeded").
zero_length_clientid_test(_Config) -> zero_length_clientid_test(_Config) ->
ct:print("Zero length clientid test starting"), ct:print("Zero length clientid test starting"),
{error, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<>>}]), %% TODO: There are some controversies on the situation when
%% clean_start flag is true and clientid is zero length.
%% {error, _} = emqx_client:start_link([{clean_start, false},
%% {client_id, <<>>}]),
{ok, _, _} = emqx_client:start_link([{clean_start, true}, {ok, _, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<>>}]), {client_id, <<>>}]),
ct:print("Zero length clientid test succeeded"). ct:print("Zero length clientid test succeeded").
@ -147,9 +127,9 @@ overlapping_subscriptions_test(_) ->
{nth(1, ?WILD_TOPICS), 1}]), {nth(1, ?WILD_TOPICS), 1}]),
timer:sleep(10), timer:sleep(10),
{ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2), {ok, _} = emqx_client:publish(C, nth(4, ?TOPICS), <<"overlapping topic filters">>, 2),
time:sleep(10), timer:sleep(10),
emqx_client:disconnect(C),
Num = receive_messages(2), Num = length(receive_messages(2)),
?assert(lists:member(Num, [1, 2])), ?assert(lists:member(Num, [1, 2])),
if if
Num == 1 -> Num == 1 ->
@ -159,23 +139,24 @@ overlapping_subscriptions_test(_) ->
ct:print("This server is publishing one message per each ct:print("This server is publishing one message per each
matching overlapping subscription."); matching overlapping subscription.");
true -> ok true -> ok
end. end,
emqx_client:disconnect(C).
keepalive_test(_) -> %% keepalive_test(_) ->
ct:print("Keepalive test starting"), %% ct:print("Keepalive test starting"),
{ok, C1, _} = emqx_client:start_link([{clean_start, true}, %% {ok, C1, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 5}, %% {keepalive, 5},
{will_topic, nth(5, ?TOPICS)}, %% {will_flag, true},
{will_payload, <<"keepalive expiry">>}]), %% {will_topic, nth(5, ?TOPICS)},
ok = emqx_client:pause(C1), %% %% {will_qos, 2},
%% {will_payload, <<"keepalive expiry">>}]),
{ok, C2, _} = emqx_client:start_link([{clean_start, true}, %% ok = emqx_client:pause(C1),
{keepalive, 0}]), %% {ok, C2, _} = emqx_client:start_link([{clean_start, true},
{ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2), %% {keepalive, 0}]),
timer:sleep(15000), %% {ok, _, [2]} = emqx_client:subscribe(C2, nth(5, ?TOPICS), 2),
ok = emqx_client:disconnect(C2), %% ok = emqx_client:disconnect(C2),
?assertEqual(1, length(receive_messages(1))), %% ?assertEqual(1, length(receive_messages(1))),
ct:print("Keepalive test succeeded"). %% ct:print("Keepalive test succeeded").
redelivery_on_reconnect_test(_) -> redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"), ct:print("Redelivery on reconnect test starting"),
@ -188,7 +169,7 @@ redelivery_on_reconnect_test(_) ->
[{qos, 1}, {retain, false}]), [{qos, 1}, {retain, false}]),
{ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>, {ok, _} = emqx_client:publish(C1, nth(4, ?TOPICS), <<>>,
[{qos, 2}, {retain, false}]), [{qos, 2}, {retain, false}]),
time:sleep(10), timer:sleep(10),
ok = emqx_client:disconnect(C1), ok = emqx_client:disconnect(C1),
?assertEqual(0, length(receive_messages(2))), ?assertEqual(0, length(receive_messages(2))),
{ok, C2, _} = emqx_client:start_link([{clean_start, false}, {ok, C2, _} = emqx_client:start_link([{clean_start, false},
@ -197,20 +178,20 @@ redelivery_on_reconnect_test(_) ->
ok = emqx_client:disconnect(C2), ok = emqx_client:disconnect(C2),
?assertEqual(2, length(receive_messages(2))). ?assertEqual(2, length(receive_messages(2))).
subscribe_failure_test(_) -> %% subscribe_failure_test(_) ->
ct:print("Subscribe failure test starting"), %% ct:print("Subscribe failure test starting"),
{ok, C, _} = emqx_client:start_link([]), %% {ok, C, _} = emqx_client:start_link([]),
{ok, _, [16#80]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2), %% {ok, _, [2]} = emqx_client:subscribe(C, <<"$SYS/#">>, 2),
timer:sleep(10), %% timer:sleep(10),
ct:print("Subscribe failure test succeeded"). %% ct:print("Subscribe failure test succeeded").
dollar_topics_test(_) -> dollar_topics_test(_) ->
ct:print("$ topics test starting"), ct:print("$ topics test starting"),
{ok, C, _} = emqx_client:start_link([{clean_start, true}, {ok, C, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]), {keepalive, 0}]),
{ok, _, [2]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 2), {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
{ok, _} = emqx_client:publish(C, <<"$", (nth(2, ?TOPICS))>>, {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
<<"">>, [{qos, 1}, {retain, false}]), <<"test">>, [{qos, 1}, {retain, false}]),
timer:sleep(10), timer:sleep(10),
?assertEqual(0, length(receive_messages(1))), ?assertEqual(0, length(receive_messages(1))),
ok = emqx_client:disconnect(C), ok = emqx_client:disconnect(C),

View File

@ -0,0 +1,132 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_protocol_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_serializer, [serialize/1]).
all() ->
[%% {group, parser},
%% {group, serializer},
{group, packet},
{group, message}].
groups() ->
[%% {parser, [],
%% [
%% parse_connect,
%% parse_bridge,
%% parse_publish,
%% parse_puback,
%% parse_pubrec,
%% parse_pubrel,
%% parse_pubcomp,
%% parse_subscribe,
%% parse_unsubscribe,
%% parse_pingreq,
%% parse_disconnect]},
%% {serializer, [],
%% [serialize_connect,
%% serialize_connack,
%% serialize_publish,
%% serialize_puback,
%% serialize_pubrel,
%% serialize_subscribe,
%% serialize_suback,
%% serialize_unsubscribe,
%% serialize_unsuback,
%% serialize_pingreq,
%% serialize_pingresp,
%% serialize_disconnect]},
{packet, [],
[packet_proto_name,
packet_type_name,
packet_format]},
{message, [],
[message_make
%% message_from_packet
]}
].
%%--------------------------------------------------------------------
%% Packet Cases
%%--------------------------------------------------------------------
packet_proto_name(_) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)),
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)).
packet_type_name(_) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
%% packet_connack_name(_) ->
%% ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)),
%% ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)),
%% ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)),
%% ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)),
%% ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)),
%% ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)).
packet_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
%%--------------------------------------------------------------------
%% Message Cases
%%--------------------------------------------------------------------
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#message.qos),
Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#message.id)),
?assertEqual(qos2, Msg1#message.qos).
%% message_from_packet(_) ->
%% Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
%% ?assertEqual(1, Msg#message.qos),
%% %% ?assertEqual(10, Msg#message.pktid),
%% ?assertEqual(<<"topic">>, Msg#message.topic),
%% WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true,
%% will_topic = <<"WillTopic">>,
%% will_payload = <<"WillMsg">>}),
%% ?assertEqual(<<"WillTopic">>, WillMsg#message.topic),
%% ?assertEqual(<<"WillMsg">>, WillMsg#message.payload).
%% Msg2 = emqx_message:fomat_packet(<<"username">>, <<"clientid">>,
%% ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),

View File

@ -1,3 +1,4 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved. %% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%% %%
%% Licensed under the Apache License, Version 2.0 (the "License"); %% Licensed under the Apache License, Version 2.0 (the "License");
@ -21,8 +22,14 @@
all() -> [t_session_all]. all() -> [t_session_all].
t_session_all(_) -> init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(), emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_session_all(_) ->
ClientId = <<"ClientId">>, ClientId = <<"ClientId">>,
{ok, ConnPid} = emqx_mock_client:start_link(ClientId), {ok, ConnPid} = emqx_mock_client:start_link(ClientId),
{ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal), {ok, SPid} = emqx_mock_client:open_session(ConnPid, ClientId, internal),