Merge branch 'emqx30'

This commit is contained in:
Feng Lee 2019-01-25 15:54:11 +08:00
commit 891ef2680e
56 changed files with 1419 additions and 550 deletions

View File

@ -1,7 +1,7 @@
language: erlang
otp_release:
- 21.0.4
- 21.2
before_install:
- git clone https://github.com/erlang/rebar3.git; cd rebar3; ./bootstrap; sudo mv rebar3 /usr/local/bin/; cd ..

View File

@ -3,21 +3,22 @@
PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker
DEPS = jsx gproc gen_rpc ekka esockd cowboy
DEPS = jsx gproc gen_rpc ekka esockd cowboy replayq
dep_jsx = hex-emqx 2.9.0
dep_gproc = hex-emqx 0.8.0
dep_gen_rpc = git-emqx https://github.com/emqx/gen_rpc 2.3.0
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.3
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.1
dep_esockd = git-emqx https://github.com/emqx/esockd v5.4.4
dep_ekka = git-emqx https://github.com/emqx/ekka v0.5.3
dep_cowboy = hex-emqx 2.4.0
dep_replayq = git-emqx https://github.com/emqx/replayq v0.1.1
NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
BUILD_DEPS = cuttlefish
dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.0
dep_cuttlefish = git-emqx https://github.com/emqx/cuttlefish v2.2.1
#TEST_DEPS = emqx_ct_helplers
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
@ -35,7 +36,8 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc
emqx_hooks emqx_batch emqx_sequence emqx_pmon emqx_pd emqx_gc emqx_ws_connection \
emqx_packet emqx_connection emqx_tracer emqx_sys_mon
CT_NODE_NAME = emqxct@127.0.0.1
CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

View File

@ -1604,6 +1604,16 @@ bridge.aws.start_type = manual
## Default: 30 seconds
bridge.aws.reconnect_interval = 30s
## Retry interval for bridge QoS1 message delivering.
##
## Value: Duration
bridge.aws.retry_interval = 20s
## Inflight size.
##
## Value: Integer
bridge.aws.max_inflight = 32
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
@ -1679,16 +1689,33 @@ bridge.aws.subscription.2.topic = cmd/topic2
## Value: Number
bridge.aws.subscription.2.qos = 1
## Bridge message queue message type.
## If enabled, queue would be written into disk more quickly.
## However, If disabled, some message would be dropped in
## the situation emqx crashed.
##
## Value: Enum
## Example: memory | disk
bridge.aws.mqueue_type = memory
## Value: on | off
bridge.aws.queue.mem_cache = on
## The pending message queue of a bridge.
## Batch size for buffer queue stored
##
## Value: Number
bridge.aws.max_pending_messages = 10000
## Value: Integer
## default: 1000
bridge.aws.queue.batch_size = 1000
## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
## replayq works in a mem-only manner. If the config
## entry was set to `bridge.aws.mqueue_type = memory`
## this config entry would have no effect on mqueue
##
## Value: String
bridge.aws.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/
## Replayq segment size
##
## Value: Bytesize
bridge.aws.queue.replayq_seg_bytes = 10MB
## Bribge to remote server via SSL.
##
@ -1741,6 +1768,11 @@ bridge.aws.ssl = off
## Default: 30 seconds
## bridge.azure.reconnect_time = 30s
## Retry interval for bridge QoS1 message delivering.
##
## Value: Duration
## bridge.azure.retry_interval = 20s
## Bridge address: node name for local bridge, host:port for remote.
##
## Value: String
@ -1816,17 +1848,26 @@ bridge.aws.ssl = off
## Value: Number
## bridge.azure.subscription.2.qos = 1
## Bridge store message type.
## Batch size for buffer queue stored
##
## Value: Enum
## Example: memory | disk
## bridge.azure.store_type = memory
## Value: Integer
## default: 1000
## bridge.azure.queue.batch_size = 1000
## The pending message queue of a bridge.
## Base directory for replayq to store messages on disk
## If this config entry is missing or set to undefined,
## replayq works in a mem-only manner. If the config
## entry was set to `bridge.aws.mqueue_type = memory`
## this config entry would have no effect on mqueue
##
## Value: Number
## bridge.azure.max_pending_messages = 10000
## Value: String
## Default: {{ platform_data_dir }}/emqx_aws_bridge/
## bridge.azure.queue.replayq_dir = {{ platform_data_dir }}/emqx_aws_bridge/
## Replayq segment size
##
## Value: Bytesize
## bridge.azure.queue.replayq_seg_bytes = 10MB
## PEM-encoded CA certificates of the bridge.
##
@ -2002,3 +2043,5 @@ sysmon.busy_port = false
##
## Value: true | false
sysmon.busy_dist_port = true
{{ additional_configs }}

View File

@ -92,4 +92,7 @@
#+sct L0-3c0-3p0N0:L4-7c0-3p1N1
## Sets the mapping of warning messages for error_logger
#+W w
#+W w
#Prevents loading information about source filenames and line numbers.
+L

View File

@ -1,23 +1,42 @@
%%--------------------------------------------------------------------
%% Logs with a header prefixed to the log message.
%% And the log args are puted into report_cb for lazy evaluation.
%%--------------------------------------------------------------------
-ifdef(LOG_HEADER).
%% with header
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%% debug | info | notice | warning | error | critical | alert | emergency
-define(DEBUG(Format), ?LOG(debug, Format, [])).
-define(DEBUG(Format, Args), ?LOG(debug, Format, Args)).
-define(INFO(Format), ?LOG(info, Format, [])).
-define(INFO(Format, Args), ?LOG(info, Format, Args)).
-define(NOTICE(Format), ?LOG(notice, Format, [])).
-define(NOTICE(Format, Args), ?LOG(notice, Format, Args)).
-define(WARN(Format), ?LOG(warning, Format, [])).
-define(WARN(Format, Args), ?LOG(warning, Format, [])).
-define(ERROR(Format), ?LOG(error, Format, [])).
-define(ERROR(Format, Args), ?LOG(error, Format, Args)).
-define(CRITICAL(Format), ?LOG(critical, Format, [])).
-define(CRITICAL(Format, Args), ?LOG(critical, Format, Args)).
-define(ALERT(Format), ?LOG(alert, Format, [])).
-define(ALERT(Format, Args), ?LOG(alert, Format, Args)).
-define(LOG(Level, Format, Args),
begin
(logger:log(Level,#{},#{report_cb =>
fun(_) ->
{?LOG_HEADER ++ " "++ (Format), (Args)}
end}))
(logger:log(Level,#{},#{report_cb => fun(_) -> {(Format), (Args)} end}))
end).
-else.
%% without header
-define(LOG(Level, Format, Args),
begin
(logger:log(Level,#{},#{report_cb =>
fun(_) ->
{(Format), (Args)}
end}))
end).
-endif.

View File

@ -424,6 +424,17 @@ end}.
hidden
]}.
%% disable lager
{mapping, "lager.handlers", "lager.handlers", [
{default, []},
hidden
]}.
{mapping, "lager.crash_log", "lager.crash_log", [
{default, off},
{datatype, flag},
hidden
]}.
{translation, "emqx.primary_log_level", fun(Conf) ->
cuttlefish:conf_get("log.level", Conf)
end}.
@ -861,6 +872,8 @@ end}.
maps:put(iolist_to_binary(Topic), P, Acc)
end, string:tokens(Val, ","))
end;
("mountpoint", Val) ->
{mountpoint, iolist_to_binary(Val)};
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,
@ -1495,8 +1508,20 @@ end}.
%%--------------------------------------------------------------------
%% Bridges
%%--------------------------------------------------------------------
{mapping, "bridge.$name.mqueue_type", "emqx.bridges", [
{datatype, {enum, [memory, disk]}}
{mapping, "bridge.$name.queue.mem_cache", "emqx.bridges", [
{datatype, flag}
]}.
{mapping, "bridge.$name.queue.batch_size", "emqx.bridges", [
{datatype, integer}
]}.
{mapping, "bridge.$name.queue.replayq_dir", "emqx.bridges", [
{datatype, string}
]}.
{mapping, "bridge.$name.queue.replayq_seg_bytes", "emqx.bridges", [
{datatype, bytesize}
]}.
{mapping, "bridge.$name.address", "emqx.bridges", [
@ -1554,11 +1579,6 @@ end}.
{datatype, string}
]}.
{mapping, "bridge.$name.max_pending_messages", "emqx.bridges", [
{default, 10000},
{datatype, integer}
]}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, ms}}
@ -1587,6 +1607,15 @@ end}.
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.retry_interval", "emqx.bridges", [
{default, "20s"},
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.max_inflight", "emqx.bridges", [
{default, 0},
{datatype, integer}
]}.
{translation, "emqx.bridges", fun(Conf) ->
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
@ -1618,7 +1647,11 @@ end}.
[{Opt, Val}|Opts]
end
end,
Queue = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".queue", Conf),
QOpts = [{list_to_atom(QOpt), QValue}|| {[_, _, "queue", QOpt], QValue} <- Configs],
maps:from_list(QOpts)
end,
Subscriptions = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
@ -1629,7 +1662,7 @@ end}.
lists:foldl(
fun({["bridge", Name, Opt], Val}, Acc) ->
%% e.g #{aws => [{OptKey, OptVal}]}
Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}],
Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}, {queue, Queue(Name)}],
maps:update_with(list_to_atom(Name),
fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
(_, Acc) -> Acc

View File

@ -6,9 +6,10 @@
%% appended to deps in rebar.config.script
{github_emqx_deps,
[{gen_rpc, "2.3.0"},
{ekka, "v0.5.1"},
{esockd, "v5.4.3"},
{cuttlefish, "v2.2.0"}
{ekka, "v0.5.3"},
{replayq, "v0.1.1"},
{esockd, "v5.4.4"},
{cuttlefish, "v2.2.1"}
]}.
{edoc_opts, [{preprocess, true}]}.
@ -26,4 +27,3 @@
{cover_export_enabled, true}.
{plugins, [coveralls]}.

View File

@ -3,8 +3,7 @@
{vsn,"git"},
{modules,[]},
{registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,
cowboy]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,cowboy,replayq]},
{env,[]},
{mod,{emqx_app,[]}},
{maintainers,["Feng Lee <feng@emqx.io>"]},

View File

@ -17,7 +17,7 @@
-include("emqx.hrl").
%% Start/Stop the application
-export([start/0, is_running/1, stop/0]).
-export([start/0, restart/1, is_running/1, stop/0]).
%% PubSub API
-export([subscribe/1, subscribe/2, subscribe/3]).
@ -47,6 +47,12 @@ start() ->
%% Check Mnesia
application:ensure_all_started(?APP).
-spec(restart(string()) -> ok).
restart(ConfFile) ->
reload_config(ConfFile),
shutdown(),
reboot().
%% @doc Stop emqx application.
-spec(stop() -> ok | {error, term()}).
stop() ->
@ -158,3 +164,11 @@ shutdown(Reason) ->
reboot() ->
lists:foreach(fun application:start/1, [gproc, esockd, ranch, cowboy, ekka, emqx]).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
reload_config(ConfFile) ->
{ok, [Conf]} = file:consult(ConfFile),
lists:foreach(fun({App, Vals}) ->
[application:set_env(App, Par, Val) || {Par, Val} <- Vals]
end, Conf).

View File

@ -17,6 +17,7 @@
-behaviour(gen_event).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([alarm_fun/0, get_alarms/0, set_alarm/1, clear_alarm/1]).
@ -84,7 +85,7 @@ handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alar
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
{error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
?ERROR("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
end,
{ok, State#{alarms := [Alarm|Alarms]}};
@ -93,23 +94,23 @@ handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
{ok, Json} ->
emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
{error, Reason} ->
emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason])
?ERROR("[AlarmMgr] Failed to encode clear: ~p", [Reason])
end,
{ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
handle_event(Event, State)->
emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
?ERROR("[AlarmMgr] unexpected event: ~p", [Event]),
{ok, State}.
handle_info(Info, State) ->
emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
?ERROR("[AlarmMgr] unexpected info: ~p", [Info]),
{ok, State}.
handle_call(get_alarms, State = #{alarms := Alarms}) ->
{ok, Alarms, State};
handle_call(Req, State) ->
emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
?ERROR("[AlarmMgr] unexpected call: ~p", [Req]),
{ok, ignored, State}.
terminate(swap, State) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -77,11 +78,11 @@ init([]) ->
{ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Banned] unexpected call: ~p", [Req]),
?ERROR("[Banned] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[Banned] unexpected msg: ~p", [Msg]),
?ERROR("[Banned] unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
@ -89,7 +90,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
{noreply, ensure_expiry_timer(State), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[Banned] unexpected info: ~p", [Info]),
?ERROR("[Banned] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{expiry_timer := TRef}) ->

View File

@ -30,9 +30,17 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {client_pid, options, reconnect_interval,
mountpoint, queue, mqueue_type, max_pending_messages,
forwards = [], subscriptions = []}).
-record(state, {client_pid :: pid(),
options :: list(),
reconnect_interval :: pos_integer(),
mountpoint :: binary(),
readq :: list(),
writeq :: list(),
replayq :: map(),
ackref :: replayq:ack_ref(),
queue_option :: map(),
forwards :: list(),
subscriptions :: list()}).
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
@ -104,20 +112,18 @@ init([Options]) ->
auto -> erlang:send_after(1000, self(), start)
end,
ReconnectInterval = get_value(reconnect_interval, Options, 30000),
MaxPendingMsg = get_value(max_pending_messages, Options, 10000),
Mountpoint = format_mountpoint(get_value(mountpoint, Options)),
MqueueType = get_value(mqueue_type, Options, memory),
Queue = [],
{ok, #state{mountpoint = Mountpoint,
queue = Queue,
mqueue_type = MqueueType,
options = Options,
reconnect_interval = ReconnectInterval,
max_pending_messages = MaxPendingMsg}}.
QueueOptions = get_value(queue, Options),
{ok, #state{mountpoint = Mountpoint,
queue_option = QueueOptions,
readq = [],
writeq = [],
options = Options,
reconnect_interval = ReconnectInterval}}.
handle_call(start_bridge, _From, State = #state{client_pid = undefined}) ->
{noreply, NewState} = handle_info(start, State),
{reply, #{msg => <<"start bridge successfully">>}, NewState};
{Msg, NewState} = bridge(start, State),
{reply, #{msg => Msg}, NewState};
handle_call(start_bridge, _From, State) ->
{reply, #{msg => <<"bridge already started">>}, State};
@ -185,45 +191,66 @@ handle_cast(Msg, State) ->
{noreply, State}.
%%----------------------------------------------------------------
%% start message bridge
%% Start or restart bridge
%%----------------------------------------------------------------
handle_info(start, State = #state{options = Options,
client_pid = undefined}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote sucessfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(get_value(forwards, Options, [])),
{noreply, State#state{client_pid = ClientPid,
subscriptions = Subs,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{noreply, State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
{noreply, State}
end;
handle_info(start, State) ->
{_Msg, NewState} = bridge(start, State),
{noreply, NewState};
handle_info(restart, State) ->
{_Msg, NewState} = bridge(restart, State),
{noreply, NewState};
%%----------------------------------------------------------------
%% pop message from replayq and publish again
%%----------------------------------------------------------------
handle_info(pop, State = #state{writeq = WriteQ, replayq = ReplayQ,
queue_option = #{batch_size := BatchSize}}) ->
{NewReplayQ, AckRef, NewReadQ} = replayq:pop(ReplayQ, #{count_limit => BatchSize}),
{NewReadQ1, NewWriteQ} = case NewReadQ of
[] -> {WriteQ, []};
_ -> {NewReadQ, WriteQ}
end,
self() ! replay,
{noreply, State#state{readq = NewReadQ1, writeq = NewWriteQ, replayq = NewReplayQ, ackref = AckRef}};
handle_info(dump, State = #state{writeq = WriteQ, replayq = ReplayQ}) ->
NewReplayQueue = replayq:append(ReplayQ, lists:reverse(WriteQ)),
{noreply, State#state{replayq = NewReplayQueue, writeq = []}};
%%----------------------------------------------------------------
%% replay message from replayq
%%----------------------------------------------------------------
handle_info(replay, State = #state{client_pid = ClientPid, readq = ReadQ}) ->
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{noreply, State#state{readq = NewReadQ}};
%%----------------------------------------------------------------
%% received local node message
%%----------------------------------------------------------------
handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid, mountpoint = Mountpoint, queue = Queue,
mqueue_type = MqueueType, max_pending_messages = MaxPendingMsg}) ->
handle_info({dispatch, _, #message{topic = Topic, qos = QoS, payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = undefined,
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
payload = Payload},
{noreply, en_writeq({undefined, Msg}, State)};
handle_info({dispatch, _, #message{topic = Topic, qos = QoS ,payload = Payload, flags = #{retain := Retain}}},
State = #state{client_pid = Pid,
mountpoint = Mountpoint})
when QoS =< 1 ->
Msg = #mqtt_msg{qos = 1,
retain = Retain,
topic = mountpoint(Mountpoint, Topic),
payload = Payload},
case emqx_client:publish(Pid, Msg) of
{ok, PkgId} ->
{noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
{error, Reason} ->
{ok, PktId} ->
{noreply, en_writeq({PktId, Msg}, State)};
{error, {PktId, Reason}} ->
emqx_logger:error("[Bridge] Publish fail:~p", [Reason]),
{noreply, State}
{noreply, en_writeq({PktId, Msg}, State)}
end;
%%----------------------------------------------------------------
@ -239,18 +266,19 @@ handle_info({publish, #{qos := QoS, dup := Dup, retain := Retain, topic := Topic
%%----------------------------------------------------------------
%% received remote puback message
%%----------------------------------------------------------------
handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueue_type = MqueueType}) ->
% lists:keydelete(PkgId, 1, Queue)
{noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
handle_info({puback, #{packet_id := PktId}}, State) ->
{noreply, delete(PktId, State)};
handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
emqx_logger:warning("[Bridge] stop ~p", [normal]),
self() ! dump,
{noreply, State#state{client_pid = undefined}};
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
reconnect_interval = ReconnectInterval}) ->
emqx_logger:error("[Bridge] stop ~p", [Reason]),
erlang:send_after(ReconnectInterval, self(), start),
self() ! dump,
erlang:send_after(ReconnectInterval, self(), restart),
{noreply, State#state{client_pid = undefined}};
handle_info(Info, State) ->
@ -267,8 +295,10 @@ subscribe_remote_topics(ClientPid, Subscriptions) ->
[begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end
|| {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})].
subscribe_local_topics(Topics) ->
[begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end
subscribe_local_topics(Options) ->
Topics = get_value(forwards, Options, []),
Subid = get_value(client_id, Options, <<"bridge">>),
[begin emqx_broker:subscribe(bin(Topic), #{qos => 1, subid => Subid}), bin(Topic) end
|| Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})].
proto_ver(mqttv3) -> v3;
@ -320,15 +350,110 @@ format_mountpoint(undefined) ->
format_mountpoint(Prefix) ->
binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
[Data | Queue];
store(memory, _Data, Queue, _MaxPendingMsg) ->
logger:error("Beyond max pending messages"),
Queue;
store(disk, Data, Queue, _MaxPendingMsg)->
[Data | Queue].
en_writeq(Msg, State = #state{replayq = ReplayQ,
queue_option = #{mem_cache := false}}) ->
NewReplayQ = replayq:append(ReplayQ, [Msg]),
State#state{replayq = NewReplayQ};
en_writeq(Msg, State = #state{writeq = WriteQ,
queue_option = #{batch_size := BatchSize,
mem_cache := true}})
when length(WriteQ) < BatchSize->
State#state{writeq = [Msg | WriteQ]} ;
en_writeq(Msg, State = #state{writeq = WriteQ, replayq = ReplayQ,
queue_option = #{mem_cache := true}}) ->
NewReplayQ =replayq:append(ReplayQ, lists:reverse(WriteQ)),
State#state{writeq = [Msg], replayq = NewReplayQ}.
delete(memory, PkgId, Queue) ->
lists:keydelete(PkgId, 1, Queue);
delete(disk, PkgId, Queue) ->
lists:keydelete(PkgId, 1, Queue).
publish_readq_msg(_ClientPid, [], NewReadQ) ->
{ok, NewReadQ};
publish_readq_msg(ClientPid, [{_PktId, Msg} | ReadQ], NewReadQ) ->
{ok, PktId} = emqx_client:publish(ClientPid, Msg),
publish_readq_msg(ClientPid, ReadQ, [{PktId, Msg} | NewReadQ]).
delete(PktId, State = #state{ replayq = ReplayQ,
readq = [],
queue_option = #{ mem_cache := false}}) ->
{NewReplayQ, NewAckRef, Msgs} = replayq:pop(ReplayQ, #{count_limit => 1}),
logger:debug("[Msg] PacketId ~p, Msg: ~p", [PktId, Msgs]),
ok = replayq:ack(NewReplayQ, NewAckRef),
case Msgs of
[{PktId, _Msg}] ->
self() ! pop,
State#state{ replayq = NewReplayQ, ackref = NewAckRef };
[{_PktId, _Msg}] ->
NewReplayQ1 = replayq:append(NewReplayQ, Msgs),
self() ! pop,
State#state{ replayq = NewReplayQ1, ackref = NewAckRef };
_Empty ->
State#state{ replayq = NewReplayQ, ackref = NewAckRef}
end;
delete(_PktId, State = #state{readq = [], writeq = [], replayq = ReplayQ, ackref = AckRef}) ->
ok = replayq:ack(ReplayQ, AckRef),
self() ! pop,
State;
delete(PktId, State = #state{readq = [], writeq = WriteQ}) ->
State#state{writeq = lists:keydelete(PktId, 1, WriteQ)};
delete(PktId, State = #state{readq = ReadQ, replayq = ReplayQ, ackref = AckRef}) ->
NewReadQ = lists:keydelete(PktId, 1, ReadQ),
case NewReadQ of
[] ->
ok = replayq:ack(ReplayQ, AckRef),
self() ! pop;
_NewReadQ ->
ok
end,
State#state{ readq = NewReadQ }.
bridge(Action, State = #state{options = Options,
replayq = ReplayQ,
queue_option
= QueueOption
= #{batch_size := BatchSize}})
when BatchSize > 0 ->
case emqx_client:start_link([{owner, self()} | options(Options)]) of
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote successfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(Options),
{NewReplayQ, AckRef, ReadQ} = open_replayq(ReplayQ, QueueOption),
{ok, NewReadQ} = publish_readq_msg(ClientPid, ReadQ, []),
{<<"start bridge successfully">>,
State#state{client_pid = ClientPid,
subscriptions = Subs,
readq = NewReadQ,
replayq = NewReplayQ,
ackref = AckRef,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{<<"connect to remote failed">>,
State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] ~p failed! error: ~p", [Action, Reason]),
{<<"start bridge failed">>, State}
end;
bridge(Action, State) ->
emqx_logger:error("[Bridge] ~p failed! error: batch_size should greater than zero", [Action]),
{<<"Open Replayq failed">>, State}.
open_replayq(undefined, #{batch_size := BatchSize,
replayq_dir := ReplayqDir,
replayq_seg_bytes := ReplayqSegBytes}) ->
ReplayQ = replayq:open(#{dir => ReplayqDir,
seg_bytes => ReplayqSegBytes,
sizer => fun(Term) ->
size(term_to_binary(Term))
end,
marshaller => fun({PktId, Msg}) ->
term_to_binary({PktId, Msg});
(Bin) ->
binary_to_term(Bin)
end}),
replayq:pop(ReplayQ, #{count_limit => BatchSize});
open_replayq(ReplayQ, #{batch_size := BatchSize}) ->
replayq:pop(ReplayQ, #{count_limit => BatchSize}).

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/2]).
-export([subscribe/1, subscribe/2, subscribe/3]).
@ -170,7 +171,7 @@ publish(Msg) when is_record(Msg, message) ->
Delivery = route(aggre(emqx_router:match_routes(Topic)), delivery(Msg1)),
Delivery#delivery.results;
{stop, _} ->
emqx_logger:warning("Stop publishing: ~s", [emqx_message:format(Msg)]),
?WARN("Stop publishing: ~s", [emqx_message:format(Msg)]),
[]
end.
@ -181,7 +182,7 @@ safe_publish(Msg) when is_record(Msg, message) ->
publish(Msg)
catch
_:Error:Stacktrace ->
emqx_logger:error("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
?ERROR("[Broker] publish error: ~p~n~p~n~p", [Error, Msg, Stacktrace])
after
ok
end.
@ -228,7 +229,7 @@ forward(Node, To, Delivery) ->
%% rpc:call to ensure the delivery, but the latency:(
case emqx_rpc:call(Node, ?BROKER, dispatch, [To, Delivery]) of
{badrpc, Reason} ->
emqx_logger:error("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
?ERROR("[Broker] Failed to forward msg to ~s: ~p", [Node, Reason]),
Delivery;
Delivery1 -> Delivery1
end.
@ -396,14 +397,14 @@ handle_call({subscribe, Topic, I}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Broker] unexpected call: ~p", [Req]),
?ERROR("[Broker] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({subscribe, Topic}, State) ->
case emqx_router:do_add_route(Topic) of
ok -> ok;
{error, Reason} ->
emqx_logger:error("[Broker] Failed to add route: ~p", [Reason])
?ERROR("[Broker] Failed to add route: ~p", [Reason])
end,
{noreply, State};
@ -426,11 +427,11 @@ handle_cast({unsubscribed, Topic, I}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
emqx_logger:error("[Broker] unexpected cast: ~p", [Msg]),
?ERROR("[Broker] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Broker] unexpected info: ~p", [Info]),
?ERROR("[Broker] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -16,6 +16,8 @@
-behaviour(gen_server).
-include("logger.hrl").
-export([start_link/0]).
-export([register_sub/2]).
-export([lookup_subid/1, lookup_subpid/1]).
@ -96,7 +98,7 @@ init([]) ->
{ok, #{pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
emqx_logger:error("[BrokerHelper] unexpected call: ~p", [Req]),
?ERROR("[BrokerHelper] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
@ -105,7 +107,7 @@ handle_cast({register_sub, SubPid, SubId}, State = #{pmon := PMon}) ->
{noreply, State#{pmon := emqx_pmon:monitor(SubPid, PMon)}};
handle_cast(Msg, State) ->
emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]),
?ERROR("[BrokerHelper] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
@ -116,7 +118,7 @@ handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon})
{noreply, State#{pmon := PMon1}};
handle_info(Info, State) ->
emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]),
?ERROR("[BrokerHelper] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -39,10 +39,10 @@
-export([initialized/3, waiting_for_connack/3, connected/3]).
-export([init/1, callback_mode/0, handle_event/4, terminate/3, code_change/4]).
-export_type([client/0, properties/0, payload/0,
pubopt/0, subopt/0, request_input/0,
response_payload/0, request_handler/0,
-export_type([client/0, properties/0, payload/0, pubopt/0, subopt/0,
request_input/0, response_payload/0, request_handler/0,
corr_data/0]).
-export_type([host/0, option/0]).
%% Default timeout
@ -199,7 +199,7 @@ start_link() -> start_link([]).
start_link(Options) when is_map(Options) ->
start_link(maps:to_list(Options));
start_link(Options) when is_list(Options) ->
ok = emqx_mqtt_props:validate(
ok = emqx_mqtt_props:validate(
proplists:get_value(properties, Options, #{})),
case proplists:get_value(name, Options) of
undefined ->
@ -697,12 +697,13 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
waiting_for_connack(cast, ?CONNACK_PACKET(ReasonCode,
_SessPresent,
Properties), State = #state{ proto_ver = ProtoVer}) ->
Properties),
State = #state{proto_ver = ProtoVer}) ->
Reason = emqx_reason_codes:name(ReasonCode, ProtoVer),
case take_call(connect, State) of
{value, #call{from = From}, _State} ->
Reply = {error, {Reason, Properties}},
{stop_and_reply, Reason, [{reply, From, Reply}]};
{stop_and_reply, {shutdown, Reason}, [{reply, From, Reply}]};
false -> {stop, connack_error}
end;
@ -715,7 +716,17 @@ waiting_for_connack(timeout, _Timeout, State) ->
end;
waiting_for_connack(EventType, EventContent, State) ->
handle_event(EventType, EventContent, waiting_for_connack, State).
case take_call(connect, State) of
{value, #call{from = From}, _State} ->
case handle_event(EventType, EventContent, waiting_for_connack, State) of
{stop, Reason, State} ->
Reply = {error, {Reason, EventContent}},
{stop_and_reply, Reason, [{reply, From, Reply}]};
StateCallbackResult ->
StateCallbackResult
end;
false -> {stop, connack_timeout}
end.
connected({call, From}, subscriptions, State = #state{subscriptions = Subscriptions}) ->
{keep_state, State, [{reply, From, maps:to_list(Subscriptions)}]};
@ -769,7 +780,7 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
when (QoS =:= ?QOS_1); (QoS =:= ?QOS_2) ->
case emqx_inflight:is_full(Inflight) of
true ->
{keep_state, State, [{reply, From, {error, inflight_full}}]};
{keep_state, State, [{reply, From, {error, {PacketId, inflight_full}}}]};
false ->
Msg1 = Msg#mqtt_msg{packet_id = PacketId},
case send(Msg1, State) of
@ -777,8 +788,8 @@ connected({call, From}, {publish, Msg = #mqtt_msg{qos = QoS}},
Inflight1 = emqx_inflight:insert(PacketId, {publish, Msg1, os:timestamp()}, Inflight),
{keep_state, ensure_retry_timer(NewState#state{inflight = Inflight1}),
[{reply, From, {ok, PacketId}}]};
Error = {error, Reason} ->
{stop_and_reply, Reason, [{reply, From, Error}]}
{error, Reason} ->
{stop_and_reply, Reason, [{reply, From, {error, {PacketId, Reason}}}]}
end
end;
@ -989,24 +1000,28 @@ should_ping(Sock) ->
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State)
when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
emqx_logger:debug("RECV Data: ~p", [Data]),
receive_loop(Data, run_sock(State));
process_incoming(Data, [], run_sock(State));
handle_event(info, {Error, _Sock, Reason}, _StateName, State)
when Error =:= tcp_error; Error =:= ssl_error ->
{stop, Reason, State};
emqx_logger:error("[~p] ~p, Reason: ~p", [?MODULE, Error, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {Closed, _Sock}, _StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
emqx_logger:debug("[~p] ~p", [?MODULE, Closed]),
{stop, {shutdown, Closed}, State};
handle_event(info, {'EXIT', Owner, Reason}, _, #state{owner = Owner}) ->
{stop, Reason};
handle_event(info, {'EXIT', Owner, Reason}, _, State = #state{owner = Owner}) ->
emqx_logger:debug("[~p] Got EXIT from owner, Reason: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(info, {inet_reply, _Sock, ok}, _, State) ->
{keep_state, State};
handle_event(info, {inet_reply, _Sock, {error, Reason}}, _, State) ->
{stop, Reason, State};
emqx_logger:error("[~p] got tcp error: ~p", [?MODULE, Reason]),
{stop, {shutdown, Reason}, State};
handle_event(EventType, EventContent, StateName, StateData) ->
emqx_logger:error("State: ~s, Unexpected Event: (~p, ~p)",
@ -1294,7 +1309,7 @@ hosts(#state{hosts = Hosts}) -> Hosts.
send_puback(Packet, State) ->
case send(Packet, State) of
{ok, NewState} -> {keep_state, NewState};
{error, Reason} -> {stop, Reason}
{error, Reason} -> {stop, {shutdown, Reason}}
end.
send(Msg, State) when is_record(Msg, mqtt_msg) ->
@ -1313,24 +1328,31 @@ run_sock(State = #state{socket = Sock}) ->
emqx_client_sock:setopts(Sock, [{active, once}]), State.
%%------------------------------------------------------------------------------
%% Receive Loop
%% Process incomming
receive_loop(<<>>, State) ->
{keep_state, State};
process_incoming(<<>>, Packets, State) ->
{keep_state, State, next_events(Packets)};
receive_loop(Bytes, State = #state{parse_state = ParseState}) ->
case catch emqx_frame:parse(Bytes, ParseState) of
process_incoming(Bytes, Packets, State = #state{parse_state = ParseState}) ->
try emqx_frame:parse(Bytes, ParseState) of
{ok, Packet, Rest} ->
ok = gen_statem:cast(self(), Packet),
receive_loop(Rest, init_parse_state(State));
process_incoming(Rest, [Packet|Packets], init_parse_state(State));
{more, NewParseState} ->
{keep_state, State#state{parse_state = NewParseState}};
{keep_state, State#state{parse_state = NewParseState}, next_events(Packets)};
{error, Reason} ->
{stop, Reason};
{'EXIT', Error} ->
{stop, Reason}
catch
error:Error ->
{stop, Error}
end.
next_events([]) ->
[];
next_events([Packet]) ->
{next_event, cast, Packet};
next_events(Packets) ->
[{next_event, cast, Packet} || Packet <- lists:reverse(Packets)].
%%------------------------------------------------------------------------------
%% Next packet id

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
@ -140,7 +141,7 @@ init([]) ->
{ok, #{conn_pmon => emqx_pmon:new()}}.
handle_call(Req, _From, State) ->
emqx_logger:error("[CM] unexpected call: ~p", [Req]),
?ERROR("[CM] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({notify, {registered, ClientId, ConnPid}}, State = #{conn_pmon := PMon}) ->
@ -150,7 +151,7 @@ handle_cast({notify, {unregistered, ConnPid}}, State = #{conn_pmon := PMon}) ->
{noreply, State#{conn_pmon := emqx_pmon:demonitor(ConnPid, PMon)}};
handle_cast(Msg, State) ->
emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
?ERROR("[CM] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
@ -161,7 +162,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}
{noreply, State#{conn_pmon := PMon1}};
handle_info(Info, State) ->
emqx_logger:error("[CM] unexpected info: ~p", [Info]),
?ERROR("[CM] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -18,8 +18,6 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-define(LOG_HEADER, "[MQTT]").
-include("logger.hrl").
-export([start_link/3]).
@ -78,11 +76,14 @@ info(#state{transport = Transport,
{sockname, Sockname},
{conn_state, ConnState},
{active_n, ActiveN},
{rate_limit, esockd_rate_limit:info(RateLimit)},
{pub_limit, esockd_rate_limit:info(PubLimit)}],
{rate_limit, rate_limit_info(RateLimit)},
{pub_limit, rate_limit_info(PubLimit)}],
ProtoInfo = emqx_protocol:info(ProtoState),
lists:usort(lists:append(ConnInfo, ProtoInfo)).
rate_limit_info(undefined) -> #{};
rate_limit_info(Limit) -> esockd_rate_limit:info(Limit).
%% for dashboard
attrs(CPid) when is_pid(CPid) ->
call(CPid, attrs);

View File

@ -16,6 +16,8 @@
-behaviour(gen_server).
-include("logger.hrl").
-export([start_link/0]).
-export([register_command/2, register_command/3, unregister_command/1]).
-export([run_command/1, run_command/2, lookup_command/1]).
@ -63,7 +65,7 @@ run_command(Cmd, Args) when is_atom(Cmd) ->
_ -> ok
catch
_:Reason:Stacktrace ->
emqx_logger:error("[CTL] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
?ERROR("[Ctl] CMD Error:~p, Stacktrace:~p", [Reason, Stacktrace]),
{error, Reason}
end;
[] ->
@ -91,14 +93,14 @@ init([]) ->
{ok, #state{seq = 0}}.
handle_call(Req, _From, State) ->
emqx_logger:error("unexpected call: ~p", [Req]),
?ERROR("[Ctl] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({register_command, Cmd, MF, Opts}, State = #state{seq = Seq}) ->
case ets:match(?TAB, {{'$1', Cmd}, '_', '_'}) of
[] -> ets:insert(?TAB, {{Seq, Cmd}, MF, Opts});
[[OriginSeq] | _] ->
emqx_logger:warning("[CTL] cmd ~s is overidden by ~p", [Cmd, MF]),
?WARN("[Ctl] cmd ~s is overidden by ~p", [Cmd, MF]),
ets:insert(?TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
noreply(next_seq(State));
@ -108,11 +110,11 @@ handle_cast({unregister_command, Cmd}, State) ->
noreply(State);
handle_cast(Msg, State) ->
emqx_logger:error("Unexpected cast: ~p", [Msg]),
?ERROR("[Ctl] unexpected cast: ~p", [Msg]),
noreply(State).
handle_info(Info, State) ->
emqx_logger:error("unexpected info: ~p", [Info]),
?ERROR("[Ctl] unexpected info: ~p", [Info]),
noreply(State).
terminate(_Reason, _State) ->
@ -151,3 +153,4 @@ register_command_test_() ->
}.
-endif.

View File

@ -16,6 +16,8 @@
-behaviour(gen_server).
-include("logger.hrl").
-export([start_link/0, stop/0]).
%% Hooks API
@ -152,7 +154,7 @@ handle_call({add, HookPoint, Callback = #callback{action = Action}}, _From, Stat
{reply, Reply, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Hooks] unexpected call: ~p", [Req]),
?ERROR("[Hooks] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({del, HookPoint, Action}, State) ->
@ -165,11 +167,11 @@ handle_cast({del, HookPoint, Action}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
emqx_logger:error("[Hooks] unexpected msg: ~p", [Msg]),
?ERROR("[Hooks] unexpected msg: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Hooks] unexpected info: ~p", [Info]),
?ERROR("[Hooks] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -25,7 +25,7 @@ start_link() ->
init([]) ->
{ok, {{one_for_one, 10, 100},
[child_spec(emqx_pool, supervisor),
[child_spec(emqx_pool_sup, supervisor),
child_spec(emqx_alarm_mgr, worker),
child_spec(emqx_hooks, worker),
child_spec(emqx_stats, worker),
@ -41,6 +41,7 @@ child_spec(M, worker) ->
shutdown => 5000,
type => worker,
modules => [M]};
child_spec(M, supervisor) ->
#{id => M,
start => {M, start_link, []},

View File

@ -27,6 +27,7 @@
-export([get_primary_log_level/0, set_primary_log_level/1]).
-export([get_log_handlers/0, get_log_handler/1, set_log_handler_level/2]).
-export([set_log_level/1]).
debug(Msg) ->
logger:debug(Msg).
@ -89,6 +90,13 @@ get_log_handler(HandlerId) ->
set_log_handler_level(HandlerId, Level) ->
logger:set_handler_config(HandlerId, level, Level).
%% Set both the primary and all handlers level in one command
set_log_level(Level) ->
case set_primary_log_level(Level) of
ok -> set_all_log_handlers_level(Level);
{error, Error} -> {error, {primary_logger_level, Error}}
end.
%%========================
%% Internal Functions
%%========================
@ -107,4 +115,23 @@ log_hanlder_info(#{id := Id, level := Level, module := logger_disk_log_h,
config := #{file := Filename}}) ->
{Id, Level, Filename};
log_hanlder_info(#{id := Id, level := Level, module := _OtherModule}) ->
{Id, Level, unknown}.
{Id, Level, unknown}.
%% set level for all log handlers in one command
set_all_log_handlers_level(Level) ->
set_all_log_handlers_level(get_log_handlers(), Level, []).
set_all_log_handlers_level([{ID, Level, _Dst} | List], NewLevel, ChangeHistory) ->
case set_log_handler_level(ID, NewLevel) of
ok -> set_all_log_handlers_level(List, NewLevel, [{ID, Level} | ChangeHistory]);
{error, Error} ->
rollback(ChangeHistory),
{error, {handlers_logger_level, {ID, Error}}}
end;
set_all_log_handlers_level([], _NewLevel, _NewHanlder) ->
ok.
rollback([{ID, Level} | List]) ->
emqx_logger:set_log_handler_level(ID, Level),
rollback(List);
rollback([]) -> ok.

View File

@ -40,7 +40,7 @@ make(From, Topic, Payload) ->
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(From, QoS, Topic, Payload) ->
#message{id = msgid(QoS),
#message{id = emqx_guid:gen(),
qos = QoS,
from = From,
flags = #{dup => false},
@ -48,9 +48,6 @@ make(From, QoS, Topic, Payload) ->
payload = Payload,
timestamp = os:timestamp()}.
msgid(?QOS_0) -> undefined;
msgid(_QoS) -> emqx_guid:gen().
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
Msg#message{flags = Flags};
set_flags(New, Msg = #message{flags = Old}) when is_map(New) ->

View File

@ -14,6 +14,7 @@
-module(emqx_metrics).
-include("logger.hrl").
-include("emqx_mqtt.hrl").
-export([start_link/0]).
@ -182,7 +183,7 @@ commit() ->
case get('$metrics') of
undefined -> ok;
Metrics ->
maps:fold(fun({Type, Metric}, Val, _Acc) ->
maps:fold(fun({Type, Metric}, Val, _Acc) ->
update_counter(key(Type, Metric), {2, Val})
end, 0, Metrics),
erase('$metrics')
@ -279,9 +280,9 @@ qos_sent(?QOS_1) ->
qos_sent(?QOS_2) ->
inc('messages/qos2/sent').
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
init([]) ->
% Create metrics table
@ -290,15 +291,15 @@ init([]) ->
{ok, #{}, hibernate}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Metrics] unexpected call: ~p", [Req]),
?ERROR("[Metrics] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[Metrics] unexpected cast: ~p", [Msg]),
?ERROR("[Metrics] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Metrics] unexpected info: ~p", [Info]),
?ERROR("[Metrics] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{}) ->

View File

@ -170,10 +170,13 @@ list() ->
plugin(CfgFile) ->
AppName = app_name(CfgFile),
{ok, Attrs} = application:get_all_key(AppName),
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr}.
case application:get_all_key(AppName) of
{ok, Attrs} ->
Ver = proplists:get_value(vsn, Attrs, "0"),
Descr = proplists:get_value(description, Attrs, ""),
#plugin{name = AppName, version = Ver, descr = Descr};
undefined -> error({plugin_not_found, AppName})
end.
%% @doc Load a Plugin
-spec(load(atom()) -> ok | {error, term()}).

View File

@ -16,9 +16,14 @@
-behaviour(gen_server).
-export([start_link/0, start_link/2]).
-include("logger.hrl").
-export([start_link/2]).
-export([submit/1, submit/2]).
-export([async_submit/1, async_submit/2]).
-ifdef(TEST).
-export([worker/0]).
-endif.
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@ -28,10 +33,6 @@
-type(task() :: fun() | mfa() | {fun(), Args :: list(any())}).
%% @doc Start pooler supervisor.
start_link() ->
emqx_pool_sup:start_link(?POOL, random, {?MODULE, start_link, []}).
%% @doc Start pool.
-spec(start_link(atom(), pos_integer()) -> emqx_types:startlink_ret()).
start_link(Pool, Id) ->
@ -80,22 +81,22 @@ handle_call({submit, Task}, _From, State) ->
{reply, catch run(Task), State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Pool] unexpected call: ~p", [Req]),
?ERROR("[Pool] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({async_submit, Task}, State) ->
try run(Task)
catch _:Error:Stacktrace ->
emqx_logger:error("[Pool] error: ~p, ~p", [Error, Stacktrace])
?ERROR("[Pool] error: ~p, ~p", [Error, Stacktrace])
end,
{noreply, State};
handle_cast(Msg, State) ->
emqx_logger:error("[Pool] unexpected cast: ~p", [Msg]),
?ERROR("[Pool] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Pool] unexpected info: ~p", [Info]),
?ERROR("[Pool] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -16,10 +16,13 @@
-behaviour(supervisor).
-export([spec/1, spec/2, start_link/3, start_link/4]).
-export([spec/1, spec/2]).
-export([start_link/0, start_link/3, start_link/4]).
-export([init/1]).
-define(POOL, emqx_pool).
-spec(spec(list()) -> supervisor:child_spec()).
spec(Args) ->
spec(pool_sup, Args).
@ -33,6 +36,10 @@ spec(ChildId, Args) ->
type => supervisor,
modules => [?MODULE]}.
%% @doc Start the default pool supervisor.
start_link() ->
start_link(?POOL, random, {?POOL, start_link, []}).
-spec(start_link(atom() | tuple(), atom(), mfa()) -> {ok, pid()} | {error, term()}).
start_link(Pool, Type, MFA) ->
start_link(Pool, Type, emqx_vm:schedulers(), MFA).

View File

@ -14,8 +14,6 @@
-module(emqx_protocol).
-define(LOG_HEADER, "[MQTT]").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
@ -23,6 +21,7 @@
-export([init/2]).
-export([info/1]).
-export([attrs/1]).
-export([attr/2]).
-export([caps/1]).
-export([stats/1]).
-export([client_id/1]).
@ -52,8 +51,6 @@
clean_start,
topic_aliases,
packet_size,
will_topic,
will_msg,
keepalive,
mountpoint,
is_super,
@ -132,13 +129,11 @@ info(PState = #pstate{conn_props = ConnProps,
ack_props = AckProps,
session = Session,
topic_aliases = Aliases,
will_msg = WillMsg,
enable_acl = EnableAcl}) ->
attrs(PState) ++ [{conn_props, ConnProps},
{ack_props, AckProps},
{session, Session},
{topic_aliases, Aliases},
{will_msg, WillMsg},
{enable_acl, EnableAcl}].
attrs(#pstate{zone = Zone,
@ -168,6 +163,28 @@ attrs(#pstate{zone = Zone,
{is_bridge, IsBridge},
{connected_at, ConnectedAt}].
attr(max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Receive-Maximum', ConnProps, 65535);
attr(max_inflight, #pstate{zone = Zone}) ->
emqx_zone:get_env(Zone, max_inflight, 65535);
attr(expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Session-Expiry-Interval', ConnProps, 0);
attr(expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}) ->
case CleanStart of
true -> 0;
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end;
attr(topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}) ->
get_property('Topic-Alias-Maximum', ConnProps, 0);
attr(topic_alias_maximum, #pstate{zone = Zone}) ->
emqx_zone:get_env(Zone, max_topic_alias, 0);
attr(Name, PState) ->
Attrs = lists:zip(record_info(fields, pstate), tl(tuple_to_list(PState))),
case lists:keyfind(Name, 1, Attrs) of
{_, Value} -> Value;
false -> undefined
end.
caps(#pstate{zone = Zone}) ->
emqx_mqtt_caps:get_caps(Zone).
@ -351,11 +368,11 @@ process_packet(?CONNECT_PACKET(
case authenticate(credentials(PState2), Password) of
{ok, IsSuper} ->
%% Maybe assign a clientId
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper,
will_msg = make_will_msg(ConnPkt)}),
PState3 = maybe_assign_client_id(PState2#pstate{is_super = IsSuper}),
emqx_logger:set_metadata_client_id(PState3#pstate.client_id),
%% Open session
case try_open_session(PState3) of
SessAttrs = #{will_msg => make_will_msg(ConnPkt)},
case try_open_session(SessAttrs, PState3) of
{ok, SPid, SP} ->
PState4 = PState3#pstate{session = SPid, connected = true},
ok = emqx_cm:register_connection(client_id(PState4)),
@ -394,8 +411,8 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_1, Topic, PacketId, _Payload), PSta
?LOG(warning, "Cannot publish qos1 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({puback, PacketId, ReasonCode}, PState) of
{ok, _PState} ->
do_acl_deny_action(Packet, ReasonCode, PState);
{ok, PState1} ->
do_acl_deny_action(Packet, ReasonCode, PState1);
Error ->
Error
end
@ -408,9 +425,9 @@ process_packet(Packet = ?PUBLISH_PACKET(?QOS_2, Topic, PacketId, _Payload), PSta
{error, ReasonCode} ->
?LOG(warning, "Cannot publish qos2 message to ~s for ~s",
[Topic, emqx_reason_codes:text(ReasonCode)]),
case deliver({pubrec, PacketId, ?RC_NOT_AUTHORIZED}, PState) of
{ok, _PState} ->
do_acl_deny_action(Packet, ReasonCode, PState);
case deliver({pubrec, PacketId, ReasonCode}, PState) of
{ok, PState1} ->
do_acl_deny_action(Packet, ReasonCode, PState1);
Error ->
Error
end
@ -474,8 +491,12 @@ process_packet(Packet = ?SUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters)
{SubTopics, ReasonCodes} = {lists:reverse(ReverseSubTopics), lists:reverse(ReverseReasonCodes)},
?LOG(warning, "Cannot subscribe ~p for ~p",
[SubTopics, [emqx_reason_codes:text(R) || R <- ReasonCodes]]),
deliver({suback, PacketId, ReasonCodes}, PState),
do_acl_deny_action(Packet, ReasonCodes, PState)
case deliver({suback, PacketId, ReasonCodes}, PState) of
{ok, PState1} ->
do_acl_deny_action(Packet, ReasonCodes, PState1);
Error ->
Error
end
end;
process_packet(?UNSUBSCRIBE_PACKET(PacketId, Properties, RawTopicFilters),
@ -500,16 +521,15 @@ process_packet(?DISCONNECT_PACKET(?RC_SUCCESS, #{'Session-Expiry-Interval' := In
case Interval =/= 0 andalso OldInterval =:= 0 of
true ->
deliver({disconnect, ?RC_PROTOCOL_ERROR}, PState),
{error, protocol_error, PState#pstate{will_msg = undefined}};
{error, protocol_error, PState};
false ->
emqx_session:update_expiry_interval(SPid, Interval),
%% Clean willmsg
{stop, normal, PState#pstate{will_msg = undefined}}
{stop, normal, PState}
end;
process_packet(?DISCONNECT_PACKET(?RC_SUCCESS), PState) ->
{stop, normal, PState#pstate{will_msg = undefined}};
{stop, normal, PState};
process_packet(?DISCONNECT_PACKET(_), PState) ->
{stop, normal, PState}.
{stop, {shutdown, abnormal_disconnet}, PState}.
%%------------------------------------------------------------------------------
%% ConnAck --> Client
@ -676,53 +696,26 @@ maybe_assign_client_id(PState = #pstate{client_id = <<>>, ack_props = AckProps})
maybe_assign_client_id(PState) ->
PState.
try_open_session(PState = #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart,
will_msg = WillMsg}) ->
SessAttrs = #{
zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
will_msg => WillMsg
},
SessAttrs1 = lists:foldl(fun set_session_attrs/2, SessAttrs, [{max_inflight, PState}, {expiry_interval, PState}]),
case emqx_sm:open_session(SessAttrs1) of
try_open_session(SessAttrs, PState = #pstate{zone = Zone,
client_id = ClientId,
conn_pid = ConnPid,
username = Username,
clean_start = CleanStart}) ->
case emqx_sm:open_session(
maps:merge(#{zone => Zone,
client_id => ClientId,
conn_pid => ConnPid,
username => Username,
clean_start => CleanStart,
max_inflight => attr(max_inflight, PState),
expiry_interval => attr(expiry_interval, PState),
topic_alias_maximum => attr(topic_alias_maximum, PState)},
SessAttrs)) of
{ok, SPid} ->
{ok, SPid, false};
Other -> Other
end.
set_session_attrs({max_inflight, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(max_inflight, get_property('Receive-Maximum', ConnProps, 65535), SessAttrs);
set_session_attrs({max_inflight, #pstate{zone = Zone}}, SessAttrs) ->
maps:put(max_inflight, emqx_zone:get_env(Zone, max_inflight, 65535), SessAttrs);
set_session_attrs({expiry_interval, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(expiry_interval, get_property('Session-Expiry-Interval', ConnProps, 0), SessAttrs);
set_session_attrs({expiry_interval, #pstate{zone = Zone, clean_start = CleanStart}}, SessAttrs) ->
maps:put(expiry_interval, case CleanStart of
true -> 0;
false -> emqx_zone:get_env(Zone, session_expiry_interval, 16#ffffffff)
end, SessAttrs);
set_session_attrs({topic_alias_maximum, #pstate{proto_ver = ?MQTT_PROTO_V5, conn_props = ConnProps}}, SessAttrs) ->
maps:put(topic_alias_maximum, get_property('Topic-Alias-Maximum', ConnProps, 0), SessAttrs);
set_session_attrs({topic_alias_maximum, #pstate{zone = Zone}}, SessAttrs) ->
maps:put(topic_alias_maximum, emqx_zone:get_env(Zone, max_topic_alias, 0), SessAttrs);
set_session_attrs(_, SessAttrs) ->
SessAttrs.
authenticate(Credentials, Password) ->
case emqx_access_control:authenticate(Credentials, Password) of
ok -> {ok, false};
@ -822,8 +815,8 @@ check_will_acl(#mqtt_packet_connect{will_topic = WillTopic}, PState) ->
case emqx_access_control:check_acl(credentials(PState), publish, WillTopic) of
allow -> ok;
deny ->
?LOG(warning, "Cannot publish will message to ~p for acl checking failed", [WillTopic]),
{error, ?RC_UNSPECIFIED_ERROR}
?LOG(warning, "Will message (to ~s) validation failed, acl denied", [WillTopic]),
{error, ?RC_NOT_AUTHORIZED}
end.
check_publish(Packet, PState) ->
@ -980,3 +973,4 @@ reason_codes_compat(unsuback, _ReasonCodes, _ProtoVer) ->
undefined;
reason_codes_compat(PktType, ReasonCodes, _ProtoVer) ->
[emqx_reason_codes:compat(PktType, RC) || RC <- ReasonCodes].

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-include_lib("ekka/include/ekka.hrl").
%% Mnesia bootstrap
@ -101,9 +102,19 @@ do_add_route(Topic, Dest) when is_binary(Topic) ->
%% @doc Match routes
-spec(match_routes(emqx_topic:topic()) -> [emqx_types:route()]).
match_routes(Topic) when is_binary(Topic) ->
%% Optimize: routing table will be replicated to all router nodes.
Matched = mnesia:ets(fun emqx_trie:match/1, [Topic]),
lists:append([lookup_routes(To) || To <- [Topic | Matched]]).
case match_trie(Topic) of
[] -> lookup_routes(Topic);
Matched ->
lists:append([lookup_routes(To) || To <- [Topic | Matched]])
end.
%% @private
%% Optimize: routing table will be replicated to all router nodes.
match_trie(Topic) ->
case emqx_trie:empty() of
true -> [];
false -> mnesia:ets(fun emqx_trie:match/1, [Topic])
end.
-spec(lookup_routes(emqx_topic:topic()) -> [emqx_types:route()]).
lookup_routes(Topic) ->
@ -167,15 +178,15 @@ handle_call({delete_route, Topic, Dest}, _From, State) ->
{reply, Ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Router] unexpected call: ~p", [Req]),
?ERROR("[Router] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[Router] unexpected cast: ~p", [Msg]),
?ERROR("[Router] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[Router] unexpected info: ~p", [Info]),
?ERROR("[Router] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #{pool := Pool, id := Id}) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -94,11 +95,11 @@ init([]) ->
{ok, #{nodes => Nodes}, hibernate}.
handle_call(Req, _From, State) ->
emqx_logger:error("[RouterHelper] unexpected call: ~p", [Req]),
?ERROR("[RouterHelper] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[RouterHelper] unexpected cast: ~p", [Msg]),
?ERROR("[RouterHelper] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, {?ROUTING_NODE, Node, _}, _}}, State = #{nodes := Nodes}) ->
@ -114,7 +115,7 @@ handle_info({mnesia_table_event, {delete, {?ROUTING_NODE, _Node}, _}}, State) ->
{noreply, State};
handle_info({mnesia_table_event, Event}, State) ->
emqx_logger:error("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
?ERROR("[RouterHelper] unexpected mnesia_table_event: ~p", [Event]),
{noreply, State};
handle_info({nodedown, Node}, State = #{nodes := Nodes}) ->
@ -132,7 +133,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
emqx_logger:error("[RouteHelper] unexpected info: ~p", [Info]),
?ERROR("[RouteHelper] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -42,6 +42,7 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
-export([start_link/1]).
-export([info/1, attrs/1]).
@ -155,9 +156,6 @@
-export_type([attr/0]).
-define(LOG(Level, Format, Args, _State),
emqx_logger:Level("[Session] " ++ Format, Args)).
%% @doc Start a session proc.
-spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
start_link(SessAttrs) ->
@ -186,7 +184,7 @@ info(State = #state{conn_pid = ConnPid,
{upgrade_qos, UpgradeQoS},
{inflight, Inflight},
{retry_interval, RetryInterval},
{mqueue_len, MQueue},
{mqueue_len, emqx_mqueue:len(MQueue)},
{awaiting_rel, AwaitingRel},
{max_awaiting_rel, MaxAwaitingRel},
{await_rel_timeout, AwaitRelTimeout}].
@ -390,11 +388,11 @@ handle_call(stats, _From, State) ->
reply(stats(State), State);
handle_call({discard, ByPid}, _From, State = #state{conn_pid = undefined}) ->
?LOG(warning, "Discarded by ~p", [ByPid], State),
?LOG(warning, "Discarded by ~p", [ByPid]),
{stop, {shutdown, discarded}, ok, State};
handle_call({discard, ByPid}, _From, State = #state{client_id = ClientId, conn_pid = ConnPid}) ->
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid], State),
?LOG(warning, "Conn ~p is discarded by ~p", [ConnPid, ByPid]),
ConnPid ! {shutdown, discard, {ClientId, ByPid}},
{stop, {shutdown, discarded}, ok, State};
@ -413,7 +411,7 @@ handle_call({register_publish_packet_id, PacketId, Ts}, _From,
{ok, ensure_stats_timer(ensure_await_rel_timer(State1))}
end;
true ->
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId], State),
?LOG(warning, "Dropped qos2 packet ~w for too many awaiting_rel", [PacketId]),
emqx_metrics:trans(inc, 'messages/qos2/dropped'),
{{error, ?RC_RECEIVE_MAXIMUM_EXCEEDED}, State}
end);
@ -425,7 +423,7 @@ handle_call({pubrec, PacketId, _ReasonCode}, _From, State = #state{inflight = In
true ->
{ok, ensure_stats_timer(acked(pubrec, PacketId, State))};
false ->
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId], State),
?LOG(warning, "The PUBREC PacketId ~w is not found.", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrec/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -437,7 +435,7 @@ handle_call({pubrel, PacketId, _ReasonCode}, _From, State = #state{awaiting_rel
{_Ts, AwaitingRel1} ->
{ok, ensure_stats_timer(State#state{awaiting_rel = AwaitingRel1})};
error ->
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId], State),
?LOG(warning, "The PUBREL PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubrel/missed'),
{{error, ?RC_PACKET_IDENTIFIER_NOT_FOUND}, State}
end);
@ -451,21 +449,21 @@ handle_call(Req, _From, State) ->
%% SUBSCRIBE:
handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, SubOpts = #{qos := QoS}}, {RcAcc, SubMap}) ->
{[QoS|RcAcc], case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
SubMap;
{ok, _SubOpts} ->
emqx_broker:set_subopts(Topic, SubOpts),
%% Why???
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => false}]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => false}]),
maps:put(Topic, SubOpts, SubMap);
error ->
emqx_broker:subscribe(Topic, ClientId, SubOpts),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId}, Topic, SubOpts#{first => true}]),
emqx_hooks:run('session.subscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts#{first => true}]),
maps:put(Topic, SubOpts, SubMap)
end}
end, {[], Subscriptions}, TopicFilters),
@ -474,13 +472,13 @@ handle_cast({subscribe, FromPid, {PacketId, _Properties, TopicFilters}},
%% UNSUBSCRIBE:
handle_cast({unsubscribe, From, {PacketId, _Properties, TopicFilters}},
State = #state{client_id = ClientId, subscriptions = Subscriptions}) ->
State = #state{client_id = ClientId, username = Username, subscriptions = Subscriptions}) ->
{ReasonCodes, Subscriptions1} =
lists:foldr(fun({Topic, _SubOpts}, {Acc, SubMap}) ->
case maps:find(Topic, SubMap) of
{ok, SubOpts} ->
ok = emqx_broker:unsubscribe(Topic),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId}, Topic, SubOpts]),
emqx_hooks:run('session.unsubscribed', [#{client_id => ClientId, username => Username}, Topic, SubOpts]),
{[?RC_SUCCESS|Acc], maps:remove(Topic, SubMap)};
error ->
{[?RC_NO_SUBSCRIPTION_EXISTED|Acc], SubMap}
@ -496,7 +494,7 @@ handle_cast({puback, PacketId, _ReasonCode}, State = #state{inflight = Inflight}
true ->
ensure_stats_timer(dequeue(acked(puback, PacketId, State)));
false ->
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId], State),
?LOG(warning, "The PUBACK PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/puback/missed'),
State
end);
@ -508,7 +506,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight
true ->
ensure_stats_timer(dequeue(acked(pubcomp, PacketId, State)));
false ->
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId], State),
?LOG(warning, "The PUBCOMP PacketId ~w is not found", [PacketId]),
emqx_metrics:trans(inc, 'packets/pubcomp/missed'),
State
end);
@ -526,14 +524,14 @@ handle_cast({resume, #{conn_pid := ConnPid,
expiry_timer = ExpireTimer,
will_delay_timer = WillDelayTimer}) ->
?LOG(info, "Resumed by connection ~p ", [ConnPid], State),
?LOG(info, "Resumed by connection ~p ", [ConnPid]),
%% Cancel Timers
lists:foreach(fun emqx_misc:cancel_timer/1,
[RetryTimer, AwaitTimer, ExpireTimer, WillDelayTimer]),
case kick(ClientId, OldConnPid, ConnPid) of
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid], State);
ok -> ?LOG(warning, "Connection ~p kickout ~p", [ConnPid, OldConnPid]);
ignore -> ok
end,
@ -614,12 +612,12 @@ handle_info({timeout, Timer, emit_stats},
GcState1 = emqx_gc:reset(GcState),
{noreply, NewState#state{gc_state = GcState1}, hibernate};
{shutdown, Reason} ->
?LOG(warning, "shutdown due to ~p", [Reason], NewState),
?LOG(warning, "shutdown due to ~p", [Reason]),
shutdown(Reason, NewState)
end;
handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->
?LOG(info, "expired, shutdown now.", [], State),
?LOG(info, "expired, shutdown now.", []),
shutdown(expired, State);
handle_info({timeout, Timer, will_delay}, State = #state{will_msg = WillMsg, will_delay_timer = Timer}) ->
@ -632,11 +630,21 @@ handle_info({'EXIT', ConnPid, Reason}, #state{conn_pid = ConnPid})
exit(Reason);
handle_info({'EXIT', ConnPid, Reason}, State = #state{will_msg = WillMsg, expiry_interval = 0, conn_pid = ConnPid}) ->
send_willmsg(WillMsg),
case Reason of
normal ->
ignore;
_ ->
send_willmsg(WillMsg)
end,
{stop, Reason, State#state{will_msg = undefined, conn_pid = undefined}};
handle_info({'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) ->
State1 = ensure_will_delay_timer(State),
handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
State1 = case Reason of
normal ->
State#state{will_msg = undefined};
_ ->
ensure_will_delay_timer(State)
end,
{noreply, ensure_expire_timer(State1#state{conn_pid = undefined})};
handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
@ -645,7 +653,7 @@ handle_info({'EXIT', OldPid, _Reason}, State = #state{old_conn_pid = OldPid}) ->
handle_info({'EXIT', Pid, Reason}, State = #state{conn_pid = ConnPid}) ->
?LOG(error, "Unexpected EXIT: conn_pid=~p, exit_pid=~p, reason=~p",
[ConnPid, Pid, Reason], State),
[ConnPid, Pid, Reason]),
{noreply, State};
handle_info(Info, State) ->
@ -654,11 +662,12 @@ handle_info(Info, State) ->
terminate(Reason, #state{will_msg = WillMsg,
client_id = ClientId,
username = Username,
conn_pid = ConnPid,
old_conn_pid = OldConnPid}) ->
send_willmsg(WillMsg),
[maybe_shutdown(Pid, Reason) || Pid <- [ConnPid, OldConnPid]],
emqx_hooks:run('session.terminated', [#{client_id => ClientId}, Reason]).
emqx_hooks:run('session.terminated', [#{client_id => ClientId, username => Username}, Reason]).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@ -666,7 +675,7 @@ code_change(_OldVsn, State, _Extra) ->
maybe_shutdown(undefined, _Reason) ->
ok;
maybe_shutdown(Pid, normal) ->
Pid ! {shutdown, normal};
Pid ! {shutdown, normal};
maybe_shutdown(Pid, Reason) ->
exit(Pid, Reason).
@ -779,7 +788,7 @@ expire_awaiting_rel([{PacketId, Ts} | More], Now,
case (timer:now_diff(Now, Ts) div 1000) of
Age when Age >= Timeout ->
emqx_metrics:trans(inc, 'messages/qos2/expired'),
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId], State),
?LOG(warning, "Dropped qos2 packet ~s for await_rel_timeout", [PacketId]),
expire_awaiting_rel(More, Now, State#state{awaiting_rel = maps:remove(PacketId, AwaitingRel)});
Age ->
ensure_await_rel_timer(Timeout - max(0, Age), State)
@ -823,8 +832,8 @@ run_dispatch_steps([{subid, SubId}|Steps], Msg, State) ->
run_dispatch_steps(Steps, emqx_message:set_header('Subscription-Identifier', SubId, Msg), State).
%% Enqueue message if the client has been disconnected
dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg]) of
dispatch(Msg, State = #state{client_id = ClientId, username = Username, conn_pid = undefined}) ->
case emqx_hooks:run('message.dropped', [#{client_id => ClientId, username => Username}, Msg]) of
ok -> enqueue_msg(Msg, State);
stop -> State
end;
@ -888,26 +897,26 @@ await(PacketId, Msg, State = #state{inflight = Inflight}) ->
PacketId, {publish, {PacketId, Msg}, os:timestamp()}, Inflight),
ensure_retry_timer(State#state{inflight = Inflight1}).
acked(puback, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
acked(puback, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg),
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
State#state{inflight = emqx_inflight:delete(PacketId, Inflight)};
none ->
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId], State),
?LOG(warning, "Duplicated PUBACK PacketId ~w", [PacketId]),
State
end;
acked(pubrec, PacketId, State = #state{client_id = ClientId, inflight = Inflight}) ->
acked(pubrec, PacketId, State = #state{client_id = ClientId, username = Username, inflight = Inflight}) ->
case emqx_inflight:lookup(PacketId, Inflight) of
{value, {publish, {_, Msg}, _Ts}} ->
emqx_hooks:run('message.acked', [#{client_id => ClientId}], Msg),
emqx_hooks:run('message.acked', [#{client_id => ClientId, username => Username}], Msg),
State#state{inflight = emqx_inflight:update(PacketId, {pubrel, PacketId, os:timestamp()}, Inflight)};
{value, {pubrel, PacketId, _Ts}} ->
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId], State),
?LOG(warning, "Duplicated PUBREC PacketId ~w", [PacketId]),
State;
none ->
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId], State),
?LOG(warning, "Unexpected PUBREC PacketId ~w", [PacketId]),
State
end;

View File

@ -16,6 +16,8 @@
-behaviour(gen_server).
-include("logger.hrl").
-export([start_link/1]).
-export([start_session/1, count_sessions/0]).
@ -34,8 +36,6 @@
-define(SUP, ?MODULE).
-define(BATCH_EXIT, 100000).
-define(ERROR_MSG(Format, Args),
error_logger:error_msg("[~s] " ++ Format, [?MODULE | Args])).
%% @doc Start session supervisor.
-spec(start_link(map()) -> emqx_types:startlink_ret()).
@ -83,8 +83,8 @@ handle_call({start_session, SessAttrs = #{client_id := ClientId}}, _From,
reply({error, Reason}, State)
catch
_:Error:Stk ->
?ERROR_MSG("Failed to start session ~p: ~p, stacktrace:~n~p",
[ClientId, Error, Stk]),
?ERROR("Failed to start session ~p: ~p, stacktrace:~n~p",
[ClientId, Error, Stk]),
reply({error, Error}, State)
end;
@ -92,11 +92,11 @@ handle_call(count_sessions, _From, State = #state{sessions = SessMap}) ->
{reply, maps:size(SessMap), State};
handle_call(Req, _From, State) ->
?ERROR_MSG("unexpected call: ~p", [Req]),
?ERROR("unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
?ERROR_MSG("unexpected cast: ~p", [Msg]),
?ERROR("unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_down = CleanDown}) ->
@ -108,7 +108,7 @@ handle_info({'EXIT', Pid, _Reason}, State = #state{sessions = SessMap, clean_dow
{noreply, State#state{sessions = SessMap1}};
handle_info(Info, State) ->
?ERROR_MSG("unexpected info: ~p", [Info]),
?ERROR("unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, State) ->

View File

@ -18,6 +18,7 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").
%% Mnesia bootstrap
-export([mnesia/1]).
@ -90,18 +91,12 @@ dispatch(Group, Topic, Delivery = #delivery{message = Msg, results = Results}, F
case pick(strategy(), ClientId, Group, Topic, FailedSubs) of
false ->
Delivery;
SubPid ->
case do_dispatch(SubPid, Topic, Msg) of
{Type, SubPid} ->
case do_dispatch(SubPid, Topic, Msg, Type) of
ok ->
Delivery#delivery{results = [{dispatch, {Group, Topic}, 1} | Results]};
{error, _Reason} ->
%% failed to dispatch to this sub, try next
%% 'Reason' is discarded so far, meaning for QoS1/2 messages
%% if all subscribers are off line, the dispatch would faile
%% even if there are sessions not expired yet.
%% If required, we can make use of the 'no_connection' reason to perform
%% retry without requiring acks, so the messages can be delivered
%% to sessions of offline clients
%% Failed to dispatch to this sub, try next.
dispatch(Group, Topic, Delivery, [SubPid | FailedSubs])
end
end.
@ -114,19 +109,23 @@ strategy() ->
ack_enabled() ->
emqx_config:get_env(shared_dispatch_ack_enabled, false).
do_dispatch(SubPid, Topic, Msg) when SubPid =:= self() ->
do_dispatch(SubPid, Topic, Msg, _Type) when SubPid =:= self() ->
%% Deadlock otherwise
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
do_dispatch(SubPid, Topic, Msg) ->
dispatch_per_qos(SubPid, Topic, Msg).
do_dispatch(SubPid, Topic, Msg, Type) ->
dispatch_per_qos(SubPid, Topic, Msg, Type).
%% return either 'ok' (when everything is fine) or 'error'
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg) ->
dispatch_per_qos(SubPid, Topic, #message{qos = ?QOS_0} = Msg, _Type) ->
%% For QoS 0 message, send it as regular dispatch
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
dispatch_per_qos(SubPid, Topic, Msg) ->
dispatch_per_qos(SubPid, Topic, Msg, retry) ->
%% Retry implies all subscribers nack:ed, send again without ack
_ = erlang:send(SubPid, {dispatch, Topic, Msg}),
ok;
dispatch_per_qos(SubPid, Topic, Msg, fresh) ->
case ack_enabled() of
true ->
dispatch_with_ack(SubPid, Topic, Msg);
@ -210,24 +209,32 @@ pick(sticky, ClientId, Group, Topic, FailedSubs) ->
true ->
%% the old subscriber is still alive
%% keep using it for sticky strategy
Sub0;
{fresh, Sub0};
false ->
%% randomly pick one for the first message
Sub = do_pick(random, ClientId, Group, Topic, FailedSubs),
{Type, Sub} = do_pick(random, ClientId, Group, Topic, [Sub0 | FailedSubs]),
%% stick to whatever pick result
erlang:put({shared_sub_sticky, Group, Topic}, Sub),
Sub
{Type, Sub}
end;
pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
do_pick(Strategy, ClientId, Group, Topic, FailedSubs).
do_pick(Strategy, ClientId, Group, Topic, FailedSubs) ->
case subscribers(Group, Topic) -- FailedSubs of
[] -> false;
[Sub] -> Sub;
All -> pick_subscriber(Group, Topic, Strategy, ClientId, All)
All = subscribers(Group, Topic),
case All -- FailedSubs of
[] when FailedSubs =:= [] ->
%% Genuinely no subscriber
false;
[] ->
%% All offline? pick one anyway
{retry, pick_subscriber(Group, Topic, Strategy, ClientId, All)};
Subs ->
%% More than one available
{fresh, pick_subscriber(Group, Topic, Strategy, ClientId, Subs)}
end.
pick_subscriber(_Group, _Topic, _Strategy, _ClientId, [Sub]) -> Sub;
pick_subscriber(Group, Topic, Strategy, ClientId, Subs) ->
Nth = do_pick_subscriber(Group, Topic, Strategy, ClientId, length(Subs)),
lists:nth(Nth, Subs).
@ -284,11 +291,11 @@ handle_call({unsubscribe, Group, Topic, SubPid}, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[SharedSub] unexpected call: ~p", [Req]),
?ERROR("[SharedSub] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[SharedSub] unexpected cast: ~p", [Msg]),
?ERROR("[SharedSub] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({mnesia_table_event, {write, NewRecord, _}}, State = #state{pmon = PMon}) ->
@ -303,12 +310,12 @@ handle_info({mnesia_table_event, _Event}, State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #state{pmon = PMon}) ->
emqx_logger:info("[SharedSub] shared subscriber down: ~p", [SubPid]),
?INFO("[SharedSub] shared subscriber down: ~p", [SubPid]),
cleanup_down(SubPid),
{noreply, update_stats(State#state{pmon = emqx_pmon:erase(SubPid, PMon)})};
handle_info(Info, State) ->
emqx_logger:error("[SharedSub] unexpected info: ~p", [Info]),
?ERROR("[SharedSub] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
@ -90,7 +91,7 @@ discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
try emqx_session:discard(SessPid, ConnPid)
catch
_:Error:_Stk ->
emqx_logger:error("[SM] Failed to discard ~p: ~p", [SessPid, Error])
?ERROR("[SM] Failed to discard ~p: ~p", [SessPid, Error])
end
end, lookup_session_pids(ClientId)).
@ -104,7 +105,7 @@ resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
{ok, SessPid};
SessPids ->
[SessPid|StalePids] = lists:reverse(SessPids),
emqx_logger:error("[SM] More than one session found: ~p", [SessPids]),
?ERROR("[SM] More than one session found: ~p", [SessPids]),
lists:foreach(fun(StalePid) ->
catch emqx_session:discard(StalePid, ConnPid)
end, StalePids),
@ -226,15 +227,15 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
emqx_logger:error("[SM] unexpected call: ~p", [Req]),
?ERROR("[SM] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
?ERROR("[SM] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(Info, State) ->
emqx_logger:error("[SM] unexpected info: ~p", [Info]),
?ERROR("[SM] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([is_enabled/0]).
@ -81,11 +82,11 @@ init([]) ->
{ok, #{}}.
handle_call(Req, _From, State) ->
emqx_logger:error("[Registry] unexpected call: ~p", [Req]),
?ERROR("[Registry] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[Registry] unexpected cast: ~p", [Msg]),
?ERROR("[Registry] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({membership, {mnesia, down, Node}}, State) ->
@ -99,7 +100,7 @@ handle_info({membership, _Event}, State) ->
{noreply, State};
handle_info(Info, State) ->
emqx_logger:error("[Registry] unexpected info: ~p", [Info]),
?ERROR("[Registry] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0, start_link/1, stop/0]).
@ -164,7 +165,7 @@ start_timer(#state{tick_ms = Ms} = State) ->
handle_call(stop, _From, State) ->
{stop, normal, _Reply = ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Stats] unexpected call: ~p", [Req]),
?ERROR("[Stats] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({setstat, Stat, MaxStat, Val}, State) ->
@ -182,7 +183,7 @@ handle_cast({setstat, Stat, MaxStat, Val}, State) ->
handle_cast({update_interval, Update = #update{name = Name}}, State = #state{updates = Updates}) ->
case lists:keyfind(Name, #update.name, Updates) of
#update{} ->
emqx_logger:error("[Stats]: duplicated update: ~s", [Name]),
?ERROR("[Stats]: duplicated update: ~s", [Name]),
{noreply, State};
false ->
{noreply, State#state{updates = [Update | Updates]}}
@ -192,7 +193,7 @@ handle_cast({cancel_update, Name}, State = #state{updates = Updates}) ->
{noreply, State#state{updates = lists:keydelete(Name, #update.name, Updates)}};
handle_cast(Msg, State) ->
emqx_logger:error("[Stats] unexpected cast: ~p", [Msg]),
?ERROR("[Stats] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Updates}) ->
@ -200,8 +201,9 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
fun(Update = #update{name = Name, countdown = C, interval = I,
func = UpFun}, Acc) when C =< 0 ->
try UpFun()
catch _:Error ->
emqx_logger:error("[Stats] update ~s error: ~p", [Name, Error])
catch
_:Error ->
?ERROR("[Stats] update ~s error: ~p", [Name, Error])
end,
[Update#update{countdown = I} | Acc];
(Update = #update{countdown = C}, Acc) ->
@ -210,7 +212,7 @@ handle_info({timeout, TRef, tick}, State = #state{timer = TRef, updates = Update
{noreply, start_timer(State#state{updates = Updates1}), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[Stats] unexpected info: ~p", [Info]),
?ERROR("[Stats] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([version/0, uptime/0, datetime/0, sysdescr/0, sys_interval/0]).
@ -98,11 +99,11 @@ handle_call(uptime, _From, State) ->
{reply, uptime(State), State};
handle_call(Req, _From, State) ->
emqx_logger:error("[SYS] unexpected call: ~p", [Req]),
?ERROR("[SYS] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[SYS] unexpected cast: ~p", [Msg]),
?ERROR("[SYS] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({timeout, TRef, heartbeat}, State = #state{heartbeat = TRef}) ->
@ -119,7 +120,7 @@ handle_info({timeout, TRef, tick}, State = #state{ticker = TRef, version = Versi
{noreply, tick(State), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[SYS] unexpected info: ~p", [Info]),
?ERROR("[SYS] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{heartbeat = TRef1, ticker = TRef2}) ->

View File

@ -16,34 +16,32 @@
-behavior(gen_server).
-export([start_link/1]).
-include("logger.hrl").
-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {timer, events}).
%% compress unused warning
-export([procinfo/1]).
-define(SYSMON, ?MODULE).
-define(LOG(Msg, ProcInfo),
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
-define(LOG(Msg, ProcInfo, PortInfo),
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
%% @doc Start system monitor
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
-spec(start_link(Opts :: list(tuple())) -> emqx_types:startlink_ret()).
start_link(Opts) ->
gen_server:start_link({local, ?SYSMON}, ?MODULE, [Opts], []).
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% gen_server callbacks
%%-----------------------------------------------------------------------------
%%------------------------------------------------------------------------------
init([Opts]) ->
erlang:system_monitor(self(), parse_opt(Opts)),
{ok, start_timer(#state{events = []})}.
emqx_logger:set_proc_metadata(#{sysmon => true}),
{ok, start_timer(#{timer => undefined, events => []})}.
start_timer(State) ->
State#state{timer = emqx_misc:start_timer(timer:seconds(2), reset)}.
State#{timer := emqx_misc:start_timer(timer:seconds(2), reset)}.
parse_opt(Opts) ->
parse_opt(Opts, []).
@ -71,18 +69,18 @@ parse_opt([_Opt|Opts], Acc) ->
parse_opt(Opts, Acc).
handle_call(Req, _From, State) ->
emqx_logger:error("[SYSMON] unexpected call: ~p", [Req]),
?ERROR("[SYSMON] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast(Msg, State) ->
emqx_logger:error("[SYSMON] unexpected cast: ~p", [Msg]),
?ERROR("[SYSMON] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info({monitor, Pid, long_gc, Info}, State) ->
suppress({long_gc, Pid},
fun() ->
WarnMsg = io_lib:format("long_gc warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(WarnMsg, procinfo(Pid)),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_gc, WarnMsg)
end, State);
@ -90,7 +88,7 @@ handle_info({monitor, Pid, long_schedule, Info}, State) when is_pid(Pid) ->
suppress({long_schedule, Pid},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(WarnMsg, procinfo(Pid)),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -98,7 +96,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
?LOG(WarnMsg, erlang:port_info(Port)),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, erlang:port_info(Port)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -106,7 +104,7 @@ handle_info({monitor, Pid, large_heap, Info}, State) ->
suppress({large_heap, Pid},
fun() ->
WarnMsg = io_lib:format("large_heap warning: pid = ~p, info: ~p", [Pid, Info]),
?LOG(WarnMsg, procinfo(Pid)),
?WARN("[SYSMON] ~s~n~p", [WarnMsg, procinfo(Pid)]),
safe_publish(large_heap, WarnMsg)
end, State);
@ -114,7 +112,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_port, WarnMsg)
end, State);
@ -122,28 +120,28 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(WarnMsg, procinfo(SusPid), erlang:port_info(Port)),
?WARN("[SYSMON] ~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
safe_publish(busy_dist_port, WarnMsg)
end, State);
handle_info({timeout, _Ref, reset}, State) ->
{noreply, State#state{events = []}, hibernate};
{noreply, State#{events := []}, hibernate};
handle_info(Info, State) ->
logger:error("[SYSMON] unexpected Info: ~p", [Info]),
?ERROR("[SYSMON] unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->
terminate(_Reason, #{timer := TRef}) ->
emqx_misc:cancel_timer(TRef).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
suppress(Key, SuccFun, State = #state{events = Events}) ->
suppress(Key, SuccFun, State = #{events := Events}) ->
case lists:member(Key, Events) of
true -> {noreply, State};
false -> SuccFun(),
{noreply, State#state{events = [Key|Events]}}
{noreply, State#{events := [Key|Events]}}
end.
procinfo(Pid) ->

View File

@ -14,7 +14,7 @@
-module(emqx_time).
-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1]).
-export([seed/0, now_secs/0, now_secs/1, now_ms/0, now_ms/1, ts_from_ms/1]).
seed() ->
rand:seed(exsplus, erlang:timestamp()).
@ -29,4 +29,7 @@ now_ms() ->
erlang:system_time(millisecond).
now_ms({MegaSecs, Secs, MicroSecs}) ->
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
(MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
ts_from_ms(Ms) ->
{Ms div 1000000, Ms rem 1000000, 0}.

View File

@ -24,6 +24,7 @@
%% Trie APIs
-export([insert/1, match/1, lookup/1, delete/1]).
-export([empty/0]).
%% Mnesia tables
-define(TRIE, emqx_trie).
@ -100,6 +101,11 @@ delete(Topic) when is_binary(Topic) ->
[] -> ok
end.
%% @doc Is the trie empty?
-spec(empty() -> boolean()).
empty() ->
ets:info(?TRIE, size) == 0.
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------

View File

@ -14,8 +14,6 @@
-module(emqx_ws_connection).
-define(LOG_HEADER, "[WS]").
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include("logger.hrl").

View File

@ -17,6 +17,7 @@
-behaviour(gen_server).
-include("emqx.hrl").
-include("logger.hrl").
-export([start_link/0]).
-export([get_env/2, get_env/3]).
@ -76,7 +77,7 @@ handle_call(force_reload, _From, State) ->
{reply, ok, State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Zone] unexpected call: ~p", [Req]),
?ERROR("[Zone] unexpected call: ~p", [Req]),
{reply, ignored, State}.
handle_cast({set_env, Zone, Key, Val}, State) ->
@ -84,7 +85,7 @@ handle_cast({set_env, Zone, Key, Val}, State) ->
{noreply, State};
handle_cast(Msg, State) ->
emqx_logger:error("[Zone] unexpected cast: ~p", [Msg]),
?ERROR("[Zone] unexpected cast: ~p", [Msg]),
{noreply, State}.
handle_info(reload, State) ->
@ -92,7 +93,7 @@ handle_info(reload, State) ->
{noreply, ensure_reload_timer(State#{timer := undefined}), hibernate};
handle_info(Info, State) ->
emqx_logger:error("[Zone] unexpected info: ~p", [Info]),
?ERROR("[Zone] unexpected info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, _State) ->

View File

@ -1,4 +1,3 @@
{deny, {user, "emqx"}, pubsub, ["acl_deny_action"]}.
{allow, all}.
{deny, {user, "pub_deny"}, publish, ["pub_deny"]}.
{allow, all}.

View File

@ -31,27 +31,28 @@ end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
bridge_test(_) ->
{ok, _Pid} = emqx_bridge:start_link(emqx, []),
#{msg := <<"start bridge successfully">>}
= emqx_bridge:start_bridge(emqx),
= emqx_bridge:start_bridge(aws),
test_forwards(),
test_subscriptions(0),
test_subscriptions(1),
test_subscriptions(2),
#{msg := <<"stop bridge successfully">>}
= emqx_bridge:stop_bridge(emqx),
= emqx_bridge:stop_bridge(aws),
ok.
test_forwards() ->
emqx_bridge:add_forward(emqx, <<"test_forwards">>),
[<<"test_forwards">>] = emqx_bridge:show_forwards(emqx),
emqx_bridge:del_forward(emqx, <<"test_forwards">>),
[] = emqx_bridge:show_forwards(emqx),
emqx_bridge:add_forward(aws, <<"test_forwards">>),
[<<"test_forwards">>, <<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws),
emqx_bridge:del_forward(aws, <<"test_forwards">>),
[<<"topic1/#">>, <<"topic2/#">>] = emqx_bridge:show_forwards(aws),
ok.
test_subscriptions(QoS) ->
emqx_bridge:add_subscription(emqx, <<"test_subscriptions">>, QoS),
[{<<"test_subscriptions">>, QoS}] = emqx_bridge:show_subscriptions(emqx),
emqx_bridge:del_subscription(emqx, <<"test_subscriptions">>),
[] = emqx_bridge:show_subscriptions(emqx),
emqx_bridge:add_subscription(aws, <<"test_subscriptions">>, QoS),
[{<<"test_subscriptions">>, QoS},
{<<"cmd/topic1">>, 1},
{<<"cmd/topic2">>, 1}] = emqx_bridge:show_subscriptions(aws),
emqx_bridge:del_subscription(aws, <<"test_subscriptions">>),
[{<<"cmd/topic1">>,1}, {<<"cmd/topic2">>,1}] = emqx_bridge:show_subscriptions(aws),
ok.

View File

@ -0,0 +1,101 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_connection_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl").
-define(STATS, [{mailbox_len, _},
{heap_size, _},
{reductions, _},
{recv_pkt, _},
{recv_msg, _},
{send_pkt, _},
{send_msg, _},
{recv_oct, _},
{recv_cnt, _},
{send_oct, _},
{send_cnt, _},
{send_pend, _}]).
-define(ATTRS, [{clean_start, _},
{client_id, _},
{connected_at, _},
{is_bridge, _},
{is_super, _},
{keepalive, _},
{mountpoint, _},
{peercert, _},
{peername, _},
{proto_name, _},
{proto_ver, _},
{sockname, _},
{username, _},
{zone, _}]).
-define(INFO, [{ack_props, _},
{active_n, _},
{clean_start, _},
{client_id, _},
{conn_props, _},
{conn_state, _},
{connected_at, _},
{enable_acl, _},
{is_bridge, _},
{is_super, _},
{keepalive, _},
{mountpoint, _},
{peercert, _},
{peername, _},
{proto_name, _},
{proto_ver, _},
{pub_limit, _},
{rate_limit, _},
{session, _},
{sockname, _},
{socktype, _},
{topic_aliases, _},
{username, _},
{zone, _}]).
all() ->
[t_connect_api].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_connect_api(_Config) ->
{ok, T1} = emqx_client:start_link([{host, "localhost"},
{client_id, <<"client1">>},
{username, <<"testuser1">>},
{password, <<"pass1">>}]),
{ok, _} = emqx_client:connect(T1),
CPid = emqx_cm:lookup_conn_pid(<<"client1">>),
?STATS = emqx_connection:stats(CPid),
?ATTRS = emqx_connection:attrs(CPid),
?INFO = emqx_connection:info(CPid),
SessionPid = emqx_connection:session(CPid),
true = is_pid(SessionPid),
emqx_client:disconnect(T1).

View File

@ -124,3 +124,28 @@ client_ssl_twoway() ->
client_ssl() ->
?CIPHERS ++ [{reuse_sessions, true}].
wait_mqtt_payload(Payload) ->
receive
{publish, #{payload := Payload}} ->
ct:pal("OK - received msg: ~p~n", [Payload])
after 1000 ->
ct:fail({timeout, Payload, {msg_box, flush()}})
end.
not_wait_mqtt_payload(Payload) ->
receive
{publish, #{payload := Payload}} ->
ct:fail({received, Payload})
after 1000 ->
ct:pal("OK - msg ~p is not received", [Payload])
end.
flush() ->
flush([]).
flush(Msgs) ->
receive
M -> flush([M|Msgs])
after
0 -> lists:reverse(Msgs)
end.

View File

@ -21,8 +21,23 @@
all() -> [t_child_all].
start_link() ->
Pid = spawn_link(?MODULE, echo, [0]),
{ok, Pid}.
echo(State) ->
receive
{From, Req} ->
ct:pal("======from:~p, req:~p", [From, Req]),
From ! Req,
echo(State)
end.
t_child_all(_) ->
{ok, _Pid} = emqx_mod_sup:start_link(),
{ok, _Child} = emqx_mod_sup:start_child(emqx_banned, worker),
{ok, Pid} = emqx_mod_sup:start_link(),
{ok, Child} = emqx_mod_sup:start_child(?MODULE, worker),
timer:sleep(10),
ok = emqx_mod_sup:stop_child(emqx_banned).
Child ! {self(), hi},
receive hi -> ok after 100 -> ct:fail({timeout, wait_echo}) end,
ok = emqx_mod_sup:stop_child(?MODULE),
exit(Pid, normal).

View File

@ -102,7 +102,7 @@ packet_message(_) ->
Pkt = emqx_packet:from_message(10, Msg2),
Msg3 = emqx_message:set_header(username, "test", Msg2),
Msg4 = emqx_packet:to_message(#{client_id => <<"clientid">>, username => "test"}, Pkt),
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp},
Msg5 = Msg4#message{timestamp = Msg3#message.timestamp, id = Msg3#message.id},
Msg5 = Msg3.
packet_format(_) ->

View File

@ -15,22 +15,22 @@
-module(emqx_pool_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
all() -> [
{group, submit_case},
{group, async_submit_case}
].
all() ->
[
{group, submit_case},
{group, async_submit_case},
t_unexpected
].
groups() ->
[
{submit_case, [sequence], [submit_mfa, submit_fa]},
{async_submit_case, [sequence], [async_submit_mfa]}
{async_submit_case, [sequence], [async_submit_mfa, async_submit_crash]}
].
init_per_suite(Config) ->
@ -40,26 +40,36 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
ok.
init_per_testcase(_, Config) ->
{ok, Sup} = emqx_pool_sup:start_link(),
[{pool_sup, Sup}|Config].
end_per_testcase(_, Config) ->
Sup = proplists:get_value(pool_sup, Config),
exit(Sup, normal).
submit_mfa(_Config) ->
erlang:process_flag(trap_exit, true),
{ok, Pid} = emqx_pool:start_link(),
Result = emqx_pool:submit({?MODULE, test_mfa, []}),
?assertEqual(15, Result),
gen_server:stop(Pid, normal, 3000),
ok.
?assertEqual(15, Result).
submit_fa(_Config) ->
{ok, Pid} = emqx_pool:start_link(),
Fun = fun(X) -> case X rem 2 of 0 -> {true, X div 2}; _ -> false end end,
Result = emqx_pool:submit(Fun, [2]),
?assertEqual({true, 1}, Result),
exit(Pid, normal).
?assertEqual({true, 1}, Result).
async_submit_mfa(_Config) ->
emqx_pool:async_submit({?MODULE, test_mfa, []}),
emqx_pool:async_submit(fun ?MODULE:test_mfa/0, []).
async_submit_crash(_) ->
emqx_pool:async_submit(fun() -> A = 1, A = 0 end).
t_unexpected(_) ->
Pid = emqx_pool:worker(),
?assertEqual(ignored, gen_server:call(Pid, bad_request)),
?assertEqual(ok, gen_server:cast(Pid, bad_msg)),
Pid ! bad_info,
ok = gen_server:stop(Pid).
test_mfa() ->
lists:foldl(fun(X, Sum) -> X + Sum end, 0, [1,2,3,4,5]).
async_submit_mfa(_Config) ->
{ok, Pid} = emqx_pool:start_link(),
emqx_pool:async_submit({?MODULE, test_mfa, []}),
exit(Pid, normal).

View File

@ -28,10 +28,69 @@
-define(TOPICS, [<<"TopicA">>, <<"TopicA/B">>, <<"Topic/C">>, <<"TopicA/C">>,
<<"/TopicA">>]).
-define(CLIENT2, ?CONNECT_PACKET(#mqtt_packet_connect{
username = <<"admin">>,
clean_start = false,
password = <<"public">>})).
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"emqx">>,
password = <<"public">>})).
-record(pstate, {
zone,
sendfun,
peername,
peercert,
proto_ver,
proto_name,
client_id,
is_assigned,
conn_pid,
conn_props,
ack_props,
username,
session,
clean_start,
topic_aliases,
packet_size,
keepalive,
mountpoint,
is_super,
is_bridge,
enable_ban,
enable_acl,
acl_deny_action,
recv_stats,
send_stats,
connected,
connected_at,
ignore_loop,
topic_alias_maximum
}).
-define(TEST_PSTATE(ProtoVer, SendStats),
#pstate{zone = test,
sendfun = fun(_Packet, _Options) -> ok end,
peername = test_peername,
peercert = test_peercert,
proto_ver = ProtoVer,
proto_name = <<"MQTT">>,
client_id = <<"test_pstate">>,
is_assigned = false,
conn_pid = self(),
username = <<"emqx">>,
is_super = false,
clean_start = false,
topic_aliases = #{},
packet_size = 1000,
mountpoint = <<>>,
is_bridge = false,
enable_ban = false,
enable_acl = true,
acl_deny_action = disconnect,
recv_stats = #{msg => 0, pkt => 0},
send_stats = SendStats,
connected = false,
ignore_loop = false,
topic_alias_maximum = #{to_client => 0, from_client => 0}}).
all() ->
[
@ -42,26 +101,25 @@ all() ->
].
groups() ->
[{mqtt_common,
[sequence],
[will_check]},
{mqttv4,
[sequence],
[{mqtt_common, [sequence],
[will_topic_check,
will_acl_check
]},
{mqttv4, [sequence],
[connect_v4,
subscribe_v4]},
{mqttv5,
[sequence],
{mqttv5, [sequence],
[connect_v5,
subscribe_v5]},
{acl,
[sequence],
[acl_deny_action]}].
{acl, [sequence],
[acl_deny_action_ct,
acl_deny_action_eunit]}].
init_per_suite(Config) ->
[start_apps(App, SchemaFile, ConfigFile) ||
{App, SchemaFile, ConfigFile}
<- [{emqx, deps_path(emqx, "priv/emqx.schema"),
deps_path(emqx, "etc/emqx.conf")}]],
deps_path(emqx, "etc/gen.emqx.conf")}]],
emqx_zone:set_env(external, max_topic_alias, 20),
Config.
@ -206,11 +264,10 @@ connect_v5(_) ->
raw_recv_parse(Data, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
{ok, Data2} = gen_tcp:recv(Sock, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(Data2, ?MQTT_PROTO_V5),
@ -290,7 +347,7 @@ connect_v5(_) ->
will_props = #{'Will-Delay-Interval' => 5},
will_topic = <<"TopicA">>,
will_payload = <<"will message">>,
properties = #{'Session-Expiry-Interval' => 3}
properties = #{'Session-Expiry-Interval' => 0}
}
)
)
@ -305,21 +362,18 @@ connect_v5(_) ->
do_connect(Sock2, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock2, raw_send_serialize(?SUBSCRIBE_PACKET(1, [{<<"TopicA">>, #{rh => 1,
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]),
#{version => ?MQTT_PROTO_V5})),
qos => ?QOS_2,
rap => 0,
nl => 0,
rc => 0}}]), #{version => ?MQTT_PROTO_V5})),
{ok, SubData} = gen_tcp:recv(Sock2, 0),
{ok, ?SUBACK_PACKET(1, #{}, [2]), _} = raw_recv_parse(SubData, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
)
),
?DISCONNECT_PACKET(?RC_SUCCESS))),
{error, timeout} = gen_tcp:recv(Sock2, 0, 1000),
{error, timeout} = gen_tcp:recv(Sock2, 0, 2000),
% session resumed
{ok, Sock3} = emqx_client_sock:connect({127, 0, 0, 1}, 1883,
@ -341,18 +395,20 @@ connect_v5(_) ->
will_payload = <<"will message 2">>,
properties = #{'Session-Expiry-Interval' => 3}
}
)
),
#{version => ?MQTT_PROTO_V5}
)
),
{ok, Data3} = gen_tcp:recv(Sock3, 0),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 1), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
{ok, ?CONNACK_PACKET(?RC_SUCCESS, 0), _} = raw_recv_parse(Data3, ?MQTT_PROTO_V5),
emqx_client_sock:send(Sock3, raw_send_serialize(
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE)
?DISCONNECT_PACKET(?RC_DISCONNECT_WITH_WILL_MESSAGE),
#{version => ?MQTT_PROTO_V5}
)
),
{ok, WillData} = gen_tcp:recv(Sock2, 0),
{ok, WillData} = gen_tcp:recv(Sock2, 0, 5000),
{ok, ?PUBLISH_PACKET(?QOS_1, <<"TopicA">>, _, <<"will message 2">>), _} = raw_recv_parse(WillData, ?MQTT_PROTO_V5),
emqx_client_sock:close(Sock2)
@ -507,65 +563,78 @@ raw_recv_parse(P, ProtoVersion) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ProtoVersion}}).
acl_deny_action(_) ->
acl_deny_action_ct(_) ->
emqx_zone:set_env(external, acl_deny_action, disconnect),
process_flag(trap_exit, true),
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
[acl_deny_do_disconnect(subscribe, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
[acl_deny_do_disconnect(publish, QoS, <<"acl_deny_action">>) || QoS <- lists:seq(0, 2)],
emqx_zone:set_env(external, acl_deny_action, ignore),
ok.
will_check(_) ->
acl_deny_action_eunit(_) ->
PState = ?TEST_PSTATE(?MQTT_PROTO_V5, #{msg => 0, pkt => 0}),
CodeName = emqx_reason_codes:name(?RC_NOT_AUTHORIZED, ?MQTT_PROTO_V5),
{error, CodeName, NEWPSTATE1} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_1, <<"acl_deny_action">>, 1, <<"payload">>), PState),
?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE1#pstate.send_stats),
{error, CodeName, NEWPSTATE2} = emqx_protocol:process_packet(?PUBLISH_PACKET(?QOS_2, <<"acl_deny_action">>, 2, <<"payload">>), PState),
?assertEqual(#{pkt => 1, msg => 0}, NEWPSTATE2#pstate.send_stats).
will_topic_check(_) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
{will_flag, true},
{will_topic, <<"aaa">>},
{will_payload, <<"I have died">>},
{will_qos, 0}]),
{ok, _} = emqx_client:connect(Client),
{ok, T} = emqx_client:start_link([{client_id, <<"client">>}]),
emqx_client:connect(T),
emqx_client:subscribe(T, <<"aaa">>),
ct:sleep(200),
emqx_client:stop(Client),
ct:sleep(100),
false = is_process_alive(Client),
emqx_ct_broker_helpers:wait_mqtt_payload(<<"I have died">>),
emqx_client:stop(T).
will_acl_check(_) ->
%% The connection will be rejected if publishing of the will message is not allowed by
%% ACL rules
process_flag(trap_exit, true),
will_topic_check(0),
will_acl_check(0).
will_topic_check(QoS) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
{ok, Client} = emqx_client:start_link([{username, <<"pub_deny">>},
{will_flag, true},
{will_topic, <<"">>},
{will_topic, <<"pub_deny">>},
{will_payload, <<"I have died">>},
{will_qos, QoS}]),
try emqx_client:connect(Client) of
_ ->
ok
catch
exit : _Reason ->
false = is_process_alive(Client)
end.
will_acl_check(QoS) ->
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>},
{will_flag, true},
{will_topic, <<"acl_deny_action">>},
{will_payload, <<"I have died">>},
{will_qos, QoS}]),
try emqx_client:connect(Client) of
_ ->
ok
catch
exit : _Reason ->
false = is_process_alive(Client)
end.
{will_qos, 0}]),
?assertMatch({error,{_,_}}, emqx_client:connect(Client)).
acl_deny_do_disconnect(publish, QoS, Topic) ->
process_flag(trap_exit, true),
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
emqx_client:publish(Client, Topic, <<"test">>, QoS),
receive
{'EXIT', Client, _Reason} ->
false = is_process_alive(Client)
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after publish, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
{'EXIT', Client, Reason} ->
ct:pal(info, "[OK] after publish, client got disconnected: ~p", [Reason])
after 1000 -> ct:fail({timeout, wait_tcp_closed})
end;
acl_deny_do_disconnect(subscribe, QoS, Topic) ->
process_flag(trap_exit, true),
{ok, Client} = emqx_client:start_link([{username, <<"emqx">>}]),
{ok, _} = emqx_client:connect(Client),
try emqx_client:subscribe(Client, Topic, QoS) of
_ ->
ok
catch
exit : _Reason ->
false = is_process_alive(Client)
{ok, _, [128]} = emqx_client:subscribe(Client, Topic, QoS),
receive
{'EXIT', Client, {shutdown,tcp_closed}} ->
ct:pal(info, "[OK] after subscribe, received exit: {shutdown,tcp_closed}"),
false = is_process_alive(Client);
{'EXIT', Client, Reason} ->
ct:pal(info, "[OK] after subscribe, client got disconnected: ~p", [Reason])
after 1000 -> ct:fail({timeout, wait_tcp_closed})
end.
start_apps(App, SchemaFile, ConfigFile) ->

View File

@ -27,10 +27,13 @@ all() ->
groups() ->
[{route, [sequence],
[t_add_delete,
[t_mnesia,
t_add_delete,
t_do_add_delete,
t_match_routes,
t_has_routes]}].
t_print_routes,
t_has_routes,
t_unexpected]}].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
@ -46,18 +49,21 @@ init_per_testcase(_TestCase, Config) ->
end_per_testcase(_TestCase, _Config) ->
clear_tables().
t_mnesia(_) ->
%% for coverage
ok = emqx_router:mnesia(copy).
t_add_delete(_) ->
?R:add_route(<<"a/b/c">>, node()),
?R:add_route(<<"a/b/c">>),
?R:add_route(<<"a/b/c">>, node()),
?R:add_route(<<"a/+/b">>, node()),
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
?R:delete_route(<<"a/b/c">>),
?R:delete_route(<<"a/+/b">>, node()),
?assertEqual([], ?R:topics()).
t_do_add_delete(_) ->
?R:do_add_route(<<"a/b/c">>, node()),
?R:do_add_route(<<"a/b/c">>),
?R:do_add_route(<<"a/b/c">>, node()),
?R:do_add_route(<<"a/+/b">>, node()),
?assertEqual([<<"a/+/b">>, <<"a/b/c">>], lists:sort(?R:topics())),
@ -67,7 +73,7 @@ t_do_add_delete(_) ->
?assertEqual([], ?R:topics()).
t_match_routes(_) ->
?R:add_route(<<"a/b/c">>, node()),
?R:add_route(<<"a/b/c">>),
?R:add_route(<<"a/+/c">>, node()),
?R:add_route(<<"a/b/#">>, node()),
?R:add_route(<<"#">>, node()),
@ -82,11 +88,22 @@ t_match_routes(_) ->
?R:delete_route(<<"#">>, node()),
?assertEqual([], lists:sort(?R:match_routes(<<"a/b/c">>))).
t_print_routes(_) ->
?R:add_route(<<"+/#">>),
?R:add_route(<<"+/+">>),
?R:print_routes(<<"a/b">>).
t_has_routes(_) ->
?R:add_route(<<"devices/+/messages">>, node()),
?assert(?R:has_routes(<<"devices/+/messages">>)),
?R:delete_route(<<"devices/+/messages">>).
t_unexpected(_) ->
Router = emqx_misc:proc_name(?R, 1),
?assertEqual(ignored, gen_server:call(Router, bad_request)),
?assertEqual(ok, gen_server:cast(Router, bad_message)),
Router ! bad_info.
clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, [emqx_route, emqx_trie, emqx_trie_node]).

View File

@ -72,6 +72,9 @@ t_random_basic(_) ->
%% out which member it picked, then close its connection
%% send the second message, the message should be 'nack'ed
%% by the sticky session and delivered to the 2nd session.
%% After the connection for the 2nd session is also closed,
%% i.e. when all clients are offline, the following message(s)
%% should be delivered randomly.
t_no_connection_nack(_) ->
ok = ensure_config(sticky),
Publisher = <<"publisher">>,
@ -117,7 +120,7 @@ t_no_connection_nack(_) ->
%% sleep then make synced calls to session processes to ensure that
%% the connection pid's 'EXIT' message is propagated to the session process
%% also to be sure sessions are still alive
timer:sleep(5),
timer:sleep(2),
_ = emqx_session:info(SPid1),
_ = emqx_session:info(SPid2),
%% Now we know what is the other still alive connection
@ -128,11 +131,21 @@ t_no_connection_nack(_) ->
SendF(Id),
?wait(Received(Id, TheOtherConnPid), 1000)
end, PacketIdList),
%% Now close the 2nd (last connection)
emqx_mock_client:stop(TheOtherConnPid),
timer:sleep(2),
%% both sessions should have conn_pid = undefined
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid1))),
?assertEqual({conn_pid, undefined}, lists:keyfind(conn_pid, 1, emqx_session:info(SPid2))),
%% send more messages, but all should be queued in session state
lists:foreach(fun(Id) -> SendF(Id) end, PacketIdList),
{_, L1} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid1)),
{_, L2} = lists:keyfind(mqueue_len, 1, emqx_session:info(SPid2)),
?assertEqual(length(PacketIdList), L1 + L2),
%% clean up
emqx_mock_client:close_session(PubConnPid),
emqx_sm:close_session(SPid1),
emqx_sm:close_session(SPid2),
emqx_mock_client:close_session(TheOtherConnPid),
ok.
t_random(_) ->

View File

@ -16,6 +16,7 @@
-include("emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-compile(export_all).
-compile(nowarn_export_all).
@ -33,7 +34,7 @@ all() -> [{group, sm}].
groups() ->
[{sm, [non_parallel_tests],
[t_open_close_session,
[
t_resume_session,
t_discard_session,
t_register_unregister_session,
@ -48,45 +49,47 @@ init_per_suite(Config) ->
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_open_close_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
?assertEqual(ok, emqx_sm:close_session(SPid)).
init_per_testcase(_All, Config) ->
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => self()}),
[{session_pid, SPid}|Config].
t_resume_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
?assertEqual({ok, SPid}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => ClientPid})).
end_per_testcase(_All, Config) ->
emqx_sm:close_session(?config(session_pid, Config)),
receive
{shutdown, normal} -> ok
after 500 -> ct:fail({timeout, wait_session_shutdown})
end.
t_resume_session(Config) ->
?assertEqual({ok, ?config(session_pid, Config)}, emqx_sm:resume_session(<<"client">>, ?ATTRS#{conn_pid => self()})).
t_discard_session(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client1">>),
{ok, _SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
?assertEqual(ok, emqx_sm:discard_session(<<"client1">>)).
t_register_unregister_session(_) ->
Pid = self(),
{ok, _ClientPid} = emqx_mock_client:start_link(<<"client">>),
?assertEqual(ok, emqx_sm:register_session(<<"client">>)),
?assertEqual(ok, emqx_sm:register_session(<<"client">>, Pid)),
?assertEqual(ok, emqx_sm:unregister_session(<<"client">>)),
?assertEqual(ok, emqx_sm:unregister_session(<<"client">>), Pid).
t_get_set_session_attrs(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid}])),
?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid}])),
[SAttr] = emqx_sm:get_session_attrs(<<"client">>, SPid),
?assertEqual(<<"client">>, maps:get(client_id, SAttr)).
t_get_set_session_attrs(Config) ->
SPid = ?config(session_pid, Config),
ClientPid0 = spawn(fun() -> receive _ -> ok end end),
?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, [?ATTRS#{conn_pid => ClientPid0}])),
?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => ClientPid0}])),
[SAttr0] = emqx_sm:get_session_attrs(<<"client">>, SPid),
?assertEqual(ClientPid0, maps:get(conn_pid, SAttr0)),
?assertEqual(true, emqx_sm:set_session_attrs(<<"client">>, SPid, [?ATTRS#{conn_pid => self()}])),
[SAttr1] = emqx_sm:get_session_attrs(<<"client">>, SPid),
?assertEqual(self(), maps:get(conn_pid, SAttr1)).
t_get_set_session_stats(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
t_get_set_session_stats(Config) ->
SPid = ?config(session_pid, Config),
?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, [{inflight, 10}])),
?assertEqual(true, emqx_sm:set_session_stats(<<"client">>, SPid, [{inflight, 10}])),
?assertEqual([{inflight, 10}], emqx_sm:get_session_stats(<<"client">>, SPid)).
t_lookup_session_pids(_) ->
{ok, ClientPid} = emqx_mock_client:start_link(<<"client">>),
{ok, SPid} = emqx_sm:open_session(?ATTRS#{conn_pid => ClientPid}),
t_lookup_session_pids(Config) ->
SPid = ?config(session_pid, Config),
?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)).

View File

@ -0,0 +1,67 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_sys_mon_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl").
-define(SYSMONPID, emqx_sys_mon).
-define(INPUTINFO, [{self(), long_gc, concat_str("long_gc warning: pid = ~p, info: ~p", self(), "hello"), "hello"},
{self(), long_schedule, concat_str("long_schedule warning: pid = ~p, info: ~p", self(), "hello"), "hello"},
{self(), busy_port, concat_str("busy_port warning: suspid = ~p, port = ~p", self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
{self(), busy_dist_port, concat_str("busy_dist_port warning: suspid = ~p, port = ~p", self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
{list_to_port("#Port<0.4>"), long_schedule, concat_str("long_schedule warning: port = ~p, info: ~p", list_to_port("#Port<0.4>"), "hello"), "hello"}
]).
all() -> [t_sys_mon].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_sys_mon(_Config) ->
lists:foreach(fun({PidOrPort, SysMonName,ValidateInfo, InfoOrPort}) ->
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
end, ?INPUTINFO).
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort) ->
{ok, C} = emqx_client:start_link([{host, "localhost"}]),
{ok, _} = emqx_client:connect(C),
emqx_client:subscribe(C, emqx_topic:systop(lists:concat(['sysmon/', SysMonName])), qos1),
timer:sleep(100),
?SYSMONPID ! {monitor, PidOrPort, SysMonName, InfoOrPort},
receive
{publish, #{payload := Info}} ->
?assertEqual(ValidateInfo, binary_to_list(Info)),
ct:pal("OK - received msg: ~p~n", [Info])
after
1000 ->
ct:fail("flase")
end,
emqx_client:stop(C).
concat_str(ValidateInfo, InfoOrPort, Info) ->
WarnInfo = io_lib:format(ValidateInfo, [InfoOrPort, Info]),
lists:flatten(WarnInfo).

View File

@ -0,0 +1,74 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_tracer_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() -> [start_traces].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
start_traces(_Config) ->
{ok, T} = emqx_client:start_link([{host, "localhost"},
{client_id, <<"client">>},
{username, <<"testuser">>},
{password, <<"pass">>}]),
emqx_client:connect(T),
%% Start tracing
ok = emqx_tracer:start_trace({client_id, <<"client">>}, debug, "tmp/client.log"),
ok = emqx_tracer:start_trace({client_id, <<"client2">>}, all, "tmp/client2.log"),
ok = emqx_tracer:start_trace({topic, <<"a/#">>}, all, "tmp/topic_trace.log"),
ct:sleep(100),
%% Verify the tracing file exits
?assert(filelib:is_regular("tmp/client.log")),
?assert(filelib:is_regular("tmp/client2.log")),
?assert(filelib:is_regular("tmp/topic_trace.log")),
%% Get current traces
?assertEqual([{{client_id,<<"client">>},{debug,"tmp/client.log"}},
{{client_id,<<"client2">>},{all,"tmp/client2.log"}},
{{topic,<<"a/#">>},{all,"tmp/topic_trace.log"}}], emqx_tracer:lookup_traces()),
%% set the overall log level to debug
emqx_logger:set_log_level(debug),
%% Client with clientid = "client" publishes a "hi" message to "a/b/c".
emqx_client:publish(T, <<"a/b/c">>, <<"hi">>),
ct:sleep(200),
%% Verify messages are logged to "tmp/client.log" and "tmp/topic_trace.log", but not "tmp/client2.log".
?assert(filelib:file_size("tmp/client.log") > 0),
?assert(filelib:file_size("tmp/topic_trace.log") > 0),
?assert(filelib:file_size("tmp/client2.log") == 0),
%% Stop tracing
ok = emqx_tracer:stop_trace({client_id, <<"client">>}),
ok = emqx_tracer:stop_trace({client_id, <<"client2">>}),
ok = emqx_tracer:stop_trace({topic, <<"a/#">>}),
emqx_client:disconnect(T),
emqx_logger:set_log_level(error).

View File

@ -24,7 +24,7 @@
-define(TRIE_TABS, [emqx_trie, emqx_trie_node]).
all() ->
[t_mnesia, t_insert, t_match, t_match2, t_match3, t_delete, t_delete2, t_delete3].
[t_mnesia, t_insert, t_match, t_match2, t_match3, t_empty, t_delete, t_delete2, t_delete3].
init_per_suite(Config) ->
application:load(emqx),
@ -58,7 +58,7 @@ t_insert(_) ->
?TRIE:insert(<<"sensor">>),
?TRIE:lookup(<<"sensor">>)
end,
?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)).
?assertEqual({atomic, [TN]}, trans(Fun)).
t_match(_) ->
Machted = [<<"sensor/+/#">>, <<"sensor/#">>],
@ -68,7 +68,7 @@ t_match(_) ->
?TRIE:insert(<<"sensor/#">>),
?TRIE:match(<<"sensor/1">>)
end,
?assertEqual({atomic, Machted}, mnesia:transaction(Fun)).
?assertEqual({atomic, Machted}, trans(Fun)).
t_match2(_) ->
Matched = {[<<"+/+/#">>, <<"+/#">>, <<"#">>], []},
@ -79,16 +79,23 @@ t_match2(_) ->
{?TRIE:match(<<"a/b/c">>),
?TRIE:match(<<"$SYS/broker/zenmq">>)}
end,
?assertEqual({atomic, Matched}, mnesia:transaction(Fun)).
?assertEqual({atomic, Matched}, trans(Fun)).
t_match3(_) ->
Topics = [<<"d/#">>, <<"a/b/c">>, <<"a/b/+">>, <<"a/#">>, <<"#">>, <<"$SYS/#">>],
mnesia:transaction(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
trans(fun() -> [emqx_trie:insert(Topic) || Topic <- Topics] end),
Matched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"a/b/c">>]),
?assertEqual(4, length(Matched)),
SysMatched = mnesia:async_dirty(fun emqx_trie:match/1, [<<"$SYS/a/b/c">>]),
?assertEqual([<<"$SYS/#">>], SysMatched).
t_empty(_) ->
?assert(?TRIE:empty()),
trans(fun ?TRIE:insert/1, [<<"topic/x/#">>]),
?assertNot(?TRIE:empty()),
trans(fun ?TRIE:delete/1, [<<"topic/x/#">>]),
?assert(?TRIE:empty()).
t_delete(_) ->
TN = #trie_node{node_id = <<"sensor/1">>,
edge_count = 2,
@ -103,19 +110,20 @@ t_delete(_) ->
?TRIE:delete(<<"sensor/1/metric">>),
?TRIE:lookup(<<"sensor/1">>)
end,
?assertEqual({atomic, [TN]}, mnesia:transaction(Fun)).
?assertEqual({atomic, [TN]}, trans(Fun)).
t_delete2(_) ->
Fun = fun() ->
?TRIE:insert(<<"sensor">>),
?TRIE:insert(<<"sensor/1/metric/2">>),
?TRIE:insert(<<"sensor/1/metric/3">>),
?TRIE:insert(<<"sensor/+/metric/3">>),
?TRIE:delete(<<"sensor">>),
?TRIE:delete(<<"sensor/1/metric/2">>),
?TRIE:delete(<<"sensor/1/metric/3">>),
?TRIE:delete(<<"sensor/+/metric/3">>),
?TRIE:delete(<<"sensor/+/metric/3">>),
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/1">>)}
end,
?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)).
?assertEqual({atomic, {[], []}}, trans(Fun)).
t_delete3(_) ->
Fun = fun() ->
@ -129,8 +137,13 @@ t_delete3(_) ->
?TRIE:delete(<<"sensor/+/unknown">>),
{?TRIE:lookup(<<"sensor">>), ?TRIE:lookup(<<"sensor/+">>)}
end,
?assertEqual({atomic, {[], []}}, mnesia:transaction(Fun)).
?assertEqual({atomic, {[], []}}, trans(Fun)).
clear_tables() ->
lists:foreach(fun mnesia:clear_table/1, ?TRIE_TABS).
trans(Fun) ->
mnesia:transaction(Fun).
trans(Fun, Args) ->
mnesia:transaction(Fun, Args).

View File

@ -0,0 +1,120 @@
%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(emqx_ws_connection_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include("emqx_mqtt.hrl").
-define(CLIENT, ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"mqtt_client">>,
username = <<"admin">>,
password = <<"public">>})).
-define(SUBCODE, [0]).
-define(PACKETID, 1).
-define(PUBQOS, 1).
-define(INFO, [{socktype, _},
{conn_state, _},
{peername, _},
{sockname, _},
{zone, _},
{client_id, <<"mqtt_client">>},
{username, <<"admin">>},
{peername, _},
{peercert, _},
{proto_ver, _},
{proto_name, _},
{clean_start, _},
{keepalive, _},
{mountpoint, _},
{is_super, _},
{is_bridge, _},
{connected_at, _},
{conn_props, _},
{ack_props, _},
{session, _},
{topic_aliases, _},
{enable_acl, _}]).
-define(ATTRS, [{clean_start,true},
{client_id, <<"mqtt_client">>},
{connected_at, _},
{is_bridge, _},
{is_super, _},
{keepalive, _},
{mountpoint, _},
{peercert, _},
{peername, _},
{proto_name, _},
{proto_ver, _},
{sockname, _},
{username, <<"admin">>},
{zone, _}]).
-define(STATS, [{recv_oct, _},
{recv_cnt, _},
{send_oct, _},
{send_cnt, _},
{mailbox_len, _},
{heap_size, _},
{reductions, _},
{recv_pkt, _},
{recv_msg, _},
{send_pkt, _},
{send_msg, _}]).
all() ->
[t_ws_connect_api].
init_per_suite(Config) ->
emqx_ct_broker_helpers:run_setup_steps(),
Config.
end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
t_ws_connect_api(_Config) ->
WS = rfc6455_client:new("ws://127.0.0.1:8083" ++ "/mqtt", self()),
{ok, _} = rfc6455_client:open(WS),
Packet = raw_send_serialize(?CLIENT),
ok = rfc6455_client:send_binary(WS, Packet),
{binary, CONACK} = rfc6455_client:recv(WS),
{ok, ?CONNACK_PACKET(?CONNACK_ACCEPT), _} = raw_recv_pase(CONACK),
Pid = emqx_cm:lookup_conn_pid(<<"mqtt_client">>),
?INFO = emqx_ws_connection:info(Pid),
?ATTRS = emqx_ws_connection:attrs(Pid),
?STATS = emqx_ws_connection:stats(Pid),
SessionPid = emqx_ws_connection:session(Pid),
true = is_pid(SessionPid),
ok = emqx_ws_connection:kick(Pid),
{close, _} = rfc6455_client:close(WS),
ok.
raw_send_serialize(Packet) ->
emqx_frame:serialize(Packet).
raw_recv_pase(P) ->
emqx_frame:parse(P, {none, #{max_packet_size => ?MAX_PACKET_SIZE,
version => ?MQTT_PROTO_V4} }).