Fix conflict

This commit is contained in:
周子博 2018-11-10 17:08:15 +08:00
commit 5028c9a775
40 changed files with 1033 additions and 774 deletions

View File

@ -4,31 +4,27 @@ PROJECT = emqx
PROJECT_DESCRIPTION = EMQ X Broker
PROJECT_VERSION = 3.0
DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique emqx_passwd
DEPS = jsx gproc gen_rpc ekka esockd cowboy clique
dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0
dep_gproc = git https://github.com/uwiger/gproc 0.8.0
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0
dep_lager = git https://github.com/erlang-lager/lager 3.6.5
dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0
dep_esockd = git https://github.com/emqx/esockd v5.4.2
dep_ekka = git https://github.com/emqx/ekka v0.4.1
dep_ekka = git https://github.com/emqx/ekka v0.5.0
dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0
dep_clique = git https://github.com/emqx/clique develop
dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30
NO_AUTOPATCH = cuttlefish
ERLC_OPTS += +debug_info -DAPPLICATION=emqx
ERLC_OPTS += +'{parse_transform, lager_transform}'
BUILD_DEPS = cuttlefish
dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30
dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.0
#TEST_DEPS = emqx_ct_helplers
#dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers
TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx
TEST_ERLC_OPTS += +'{parse_transform, lager_transform}'
EUNIT_OPTS = verbose
@ -47,7 +43,7 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)
COVER = true
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia
PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia
DIALYZER_DIRS := ebin/
DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns
@ -74,8 +70,10 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf
ok = file:write_file('etc/gen.emqx.conf', Targ), \
halt(0)."
app.config: cuttlefish etc/gen.emqx.conf
$(verbose) ./cuttlefish -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish
app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf
$(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/
ct: app.config
@ -86,11 +84,8 @@ coveralls:
@rebar3 coveralls send
cuttlefish: rebar-deps
@if [ ! -f cuttlefish ]; then \
make -C _build/default/lib/cuttlefish; \
mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \
fi
$(CUTTLEFISH_SCRIPT): rebar-deps
@if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi
rebar-xref:
@rebar3 xref
@ -98,7 +93,7 @@ rebar-xref:
rebar-deps:
@rebar3 get-deps
rebar-eunit: cuttlefish
rebar-eunit: $(CUTTLEFISH_SCRIPT)
@rebar3 eunit
rebar-compile:

View File

@ -328,81 +328,62 @@ rpc.socket_keepalive_count = 9
## Log
##--------------------------------------------------------------------
## Sets the log dir.
## Where to emit the logs.
## Enable the console (standard output) logs.
##
## Value: off | file | console | both
## - off: disable logs entirely
## - file: write logs to file
## - console: write logs to standard I/O
## - both: write logs both to file and standard I/O
log.to = console
## The log severity level.
##
## Value: debug | info | notice | warning | error | critical | alert | emergency
##
## Note: Only the messages with severity level greater than or equal to
## this level will be logged.
##
## Default: error
log.level = error
## The dir for log files.
##
## Value: Folder
log.dir = {{ platform_log_dir }}
## Where to emit the console logs.
## The log filename for logs of level specified in "log.level".
##
## Value: off | file | console | both
## - off: disabled
## - file: write to file
## - console: write to stdout
## - both: file and stdout
log.console = console
## Value: String
## Default: emqx.log
log.file = emqx.log
## Sets the severity level of console log.
##
## Value: debug | info | notice | warning | error | critical | alert | emergency
##
## Default: error
log.console.level = error
## The file where console logs will be writed to, when 'log.console' is set as 'file'.
##
## Value: File Name
## log.console.file = {{ platform_log_dir }}/console.log
## Maximum file size for console log.
##
## Value: Number(bytes)
## log.console.size = 10485760
## The rotation count for console log.
## Maximum size of each log file.
##
## Value: Number
## log.console.count = 5
## Default: 10M
## Supported Unit: KB | MB | G
log.rotation.size = 10MB
## The file where info logs will be writed to.
##
## Value: File Name
## log.info.file = {{ platform_log_dir }}/info.log
## Maximum file size for info log.
##
## Value: Number(bytes)
## log.info.size = 10485760
## The rotation count for info log.
## Maximum rotation count of log files.
##
## Value: Number
## log.info.count = 5
## Default: 5
log.rotation.count = 5
## The file where error logs will be writed to.
## To create additional log files for specific log levels.
##
## Value: File Name
log.error.file = {{ platform_log_dir }}/error.log
## Maximum file size for error log.
## Format: log.$level.file = $filename,
## where "$level" can be one of: debug, info, notice, warning,
## error, critical, alert, emergency
## Note: Log files for a specific log level will contain all the logs
## that greater than or equal to that level
##
## Value: Number(bytes)
log.error.size = 10485760
## The rotation count for error log.
##
## Value: Number
log.error.count = 5
## Enable the crash log.
##
## Value: on | off
log.crash = on
## The file for crash log.
##
## Value: File Name
log.crash.file = {{ platform_log_dir }}/crash.log
#log.info.file = info.log
#log.error.file = error.log
##--------------------------------------------------------------------
## Authentication/Access Control
@ -424,22 +405,20 @@ acl_nomatch = allow
## Value: File Name
acl_file = {{ platform_etc_dir }}/acl.conf
## Whether to enable ACL cache for publish.
## The ACL cache size
## The maximum count of ACL entries allowed for a client.
## Whether to enable ACL cache.
##
## If enabled, ACLs roles for each client will be cached in the memory
##
## Value: on | off
enable_acl_cache = on
## The ACL cache size
## The maximum count of ACL entries allowed for a client.
## The maximum count of ACL entries can be cached for a client.
##
## Value: Integer greater than 0
## Default: 32
acl_cache_max_size = 32
## The ACL cache time-to-live.
## The time after which an ACL cache entry will be invalid
## The time after which an ACL cache entry will be deleted
##
## Value: Duration
## Default: 1 minute
@ -496,17 +475,6 @@ mqtt.wildcard_subscription = true
## Value: boolean
mqtt.shared_subscription = true
## Message queue type.
##
## Value: simple | priority
mqtt.mqueue_type = simple
## Topic priorities. Default is 0.
##
## Priority: Number [0-255]
##
## mqtt.mqueue_priorities = topic/1=10,topic/2=8
##--------------------------------------------------------------------
## Zones
##--------------------------------------------------------------------
@ -639,22 +607,29 @@ zone.external.await_rel_timeout = 300s
## Default: 2h, 2 hours
zone.external.session_expiry_interval = 2h
## Message queue type.
##
## Value: simple | priority
zone.external.mqueue_type = simple
## Maximum queue length. Enqueued messages when persistent client disconnected,
## or inflight window is full. 0 means no limit.
##
## Value: Number >= 0
zone.external.max_mqueue_len = 1000
## Topic priorities. Default is 0.
## Topic priorities.
## 'none' to indicate no priority table (by default), hence all messages
## are treated equal
##
## Priority: Number [0-255]
## Priority number [1-255]
## Example: topic/1=10,topic/2=8
## NOTE: comma and equal signs are not allowed for priority topic names
## NOTE: messages for topics not in the priority table are treated as
## either highest or lowest priority depending on the configured
## value for mqueue_default_priority
##
## zone.external.mqueue_priorities = topic/1=10,topic/2=8
zone.external.mqueue_priorities = none
## Default to highest priority for topics not matching priority table
##
## Value: highest | lowest
zone.external.mqueue_default_priority = highest
## Whether to enqueue Qos0 messages.
##
@ -1608,7 +1583,11 @@ bridge.aws.client_id = bridge_aws
## The Clean start flag of a remote bridge.
##
## Value: boolean
bridge.aws.clean_start = false
## Default: true
##
## NOTE: Some IoT platforms require clean_start
## must be set to 'true'
## bridge.aws.clean_start = true
## The username for a remote bridge.
##
@ -1668,21 +1647,25 @@ bridge.aws.mqueue_type = memory
## Value: Number
bridge.aws.max_pending_messages = 10000
## Bribge to remote server via SSL.
##
## Value: on | off
bridge.aws.ssl = off
## PEM-encoded CA certificates of the bridge.
##
## Value: File
## bridge.aws.cacertfile = cacert.pem
## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem
## SSL Certfile of the bridge.
## Client SSL Certfile of the bridge.
##
## Value: File
## bridge.aws.certfile = cert.pem
## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem
## SSL Keyfile of the bridge.
## Client SSL Keyfile of the bridge.
##
## Value: File
## bridge.aws.keyfile = key.pem
## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem
## SSL Ciphers used by the bridge.
##
@ -1737,7 +1720,11 @@ bridge.aws.max_pending_messages = 10000
## The Clean start flag of a remote bridge.
##
## Value: boolean
## bridge.azure.clean_start = false
## Default: true
##
## NOTE: Some IoT platforms require clean_start
## must be set to 'true'
## bridge.azure.clean_start = true
## The username for a remote bridge.
##
@ -1803,12 +1790,12 @@ bridge.aws.max_pending_messages = 10000
## Value: File
## bridge.azure.cacertfile = cacert.pem
## SSL Certfile of the bridge.
## Client SSL Certfile of the bridge.
##
## Value: File
## bridge.azure.certfile = cert.pem
## SSL Keyfile of the bridge.
## Client SSL Keyfile of the bridge.
##
## Value: File
## bridge.azure.keyfile = key.pem

View File

@ -27,6 +27,12 @@
-define(ERTS_MINIMUM_REQUIRED, "10.0").
%%--------------------------------------------------------------------
%% Configs
%%--------------------------------------------------------------------
-define(NO_PRIORITY_TABLE, none).
%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%%--------------------------------------------------------------------

View File

@ -43,11 +43,7 @@
-define(QOS_1, 1). %% At least once
-define(QOS_2, 2). %% Exactly once
-define(QOS0, 0). %% At most once
-define(QOS1, 1). %% At least once
-define(QOS2, 2). %% Exactly once
-define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)).
-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)).
-define(QOS_I(Name),
begin

View File

@ -382,145 +382,114 @@ end}.
%% Log
%%--------------------------------------------------------------------
{mapping, "log.dir", "lager.log_dir", [
{mapping, "log.to", "kernel.logger", [
{default, console},
{datatype, {enum, [off, file, console, both]}}
]}.
{mapping, "log.level", "kernel.logger_level", [
{default, error},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}}
]}.
{mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [
{default, true},
{datatype, {enum, [true, false]}}
]}.
{mapping, "log.dir", "kernel.logger", [
{default, "log"},
{datatype, string}
]}.
{mapping, "log.console", "lager.handlers", [
{default, file},
{datatype, {enum, [off, file, console, both]}}
]}.
{mapping, "log.console.level", "lager.handlers", [
{default, info},
{datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}}
]}.
{mapping, "log.console.file", "lager.handlers", [
{default, "log/console.log"},
{mapping, "log.file", "kernel.logger", [
{default, "emqx.log"},
{datatype, file}
]}.
{mapping, "log.console.size", "lager.handlers", [
{default, 10485760},
{datatype, integer}
{mapping, "log.rotation.size", "kernel.logger", [
{default, "10MB"},
{datatype, bytesize}
]}.
{mapping, "log.console.count", "lager.handlers", [
{mapping, "log.rotation.count", "kernel.logger", [
{default, 5},
{datatype, integer}
]}.
{mapping, "log.info.file", "lager.handlers", [
{mapping, "log.$level.file", "kernel.logger", [
{datatype, file}
]}.
{mapping, "log.info.size", "lager.handlers", [
{default, 10485760},
{datatype, integer}
]}.
{mapping, "log.info.count", "lager.handlers", [
{default, 5},
{datatype, integer}
]}.
{mapping, "log.error.file", "lager.handlers", [
{default, "log/error.log"},
{datatype, file}
]}.
{mapping, "log.error.size", "lager.handlers", [
{default, 10485760},
{datatype, integer}
]}.
{mapping, "log.error.count", "lager.handlers", [
{default, 5},
{datatype, integer}
]}.
{mapping, "log.error.redirect", "lager.error_logger_redirect", [
{default, on},
{datatype, flag},
hidden
]}.
{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [
{default, 1000},
{datatype, integer},
hidden
]}.
{translation,
"lager.handlers",
fun(Conf) ->
ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf, undefined) of
undefined -> [];
ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename},
{level, error},
{size, cuttlefish:conf_get("log.error.size", Conf)},
{date, "$D0"},
{count, cuttlefish:conf_get("log.error.count", Conf)}]}]
end,
InfoHandler = case cuttlefish:conf_get("log.info.file", Conf, undefined) of
undefined -> [];
InfoFilename -> [{lager_file_backend, [{file, InfoFilename},
{level, info},
{size, cuttlefish:conf_get("log.info.size", Conf)},
{date, "$D0"},
{count, cuttlefish:conf_get("log.info.count", Conf)}]}]
end,
ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf),
ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf),
ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]},
ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile},
{level, ConsoleLogLevel},
{size, cuttlefish:conf_get("log.console.size", Conf)},
{date, "$D0"},
{count, cuttlefish:conf_get("log.console.count", Conf)}]},
ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of
off -> [];
file -> [ConsoleFileHandler];
console -> [ConsoleHandler];
both -> [ConsoleHandler, ConsoleFileHandler];
_ -> []
end,
ConsoleHandlers ++ ErrorHandler ++ InfoHandler
end
}.
{mapping, "log.crash", "lager.crash_log", [
{default, on},
{datatype, flag}
]}.
{mapping, "log.crash.file", "lager.crash_log", [
{default, "log/crash.log"},
{datatype, file}
]}.
{translation,
"lager.crash_log",
fun(Conf) ->
case cuttlefish:conf_get("log.crash", Conf) of
false -> undefined;
_ ->
cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log")
end
end}.
{mapping, "sasl", "sasl.sasl_error_logger", [
{default, off},
{datatype, flag},
hidden
]}.
{translation, "kernel.logger", fun(Conf) ->
LogTo = cuttlefish:conf_get("log.to", Conf),
TopLogLevel = cuttlefish:conf_get("log.level", Conf),
Formatter = {emqx_logger_formatter,
#{template =>
[time," [",level,"] ",
{client_id,
[{peername,
[client_id,"@",peername," "],
[client_id, " "]}],
[]},
msg,"\n"]}},
FileConf = fun(Filename) ->
#{type => wrap,
file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename),
max_no_files => cuttlefish:conf_get("log.rotation.count", Conf),
max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)}
end,
%% For the default logger that outputs to console
DefaultHandler =
if LogTo =:= console orelse LogTo =:= both ->
[{handler, default, logger_std_h,
#{level => TopLogLevel,
config => #{type => standard_io},
formatter => Formatter}}];
true ->
[{handler, default, undefined}]
end,
%% For the file logger
FileHandler =
if LogTo =:= file orelse LogTo =:= both ->
[{handler, file, logger_disk_log_h,
#{level => TopLogLevel,
config => FileConf(cuttlefish:conf_get("log.file", Conf)),
formatter => Formatter,
filesync_repeat_interval => no_repeat}}];
true -> []
end,
%% For creating additional log files for specific log levels.
AdditionalLogFiles =
lists:foldl(
fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both ->
case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of
true -> [{Level, Filename} | Acc];
false -> Acc
end;
({_K, _V}, Acc) ->
Acc
end, [], Conf),
AdditionalHandlers =
[{handler, list_to_atom("file_for_"++Level), logger_disk_log_h,
#{level => list_to_atom(Level),
config => FileConf(Filename),
formatter => Formatter,
filesync_repeat_interval => no_repeat}}
|| {Level, Filename} <- AdditionalLogFiles],
DefaultHandler ++ FileHandler ++ AdditionalHandlers
end}.
%%--------------------------------------------------------------------
%% Authentication/ACL
%%--------------------------------------------------------------------
@ -624,18 +593,6 @@ end}.
{datatype, {enum, [true, false]}}
]}.
%% @doc Type: simple | priority
{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [
{default, simple},
{datatype, {enum, [simple, priority]}}
]}.
%% @doc Topic Priorities: 0~255, Default is 0
{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [
{default, ""},
{datatype, string}
]}.
%%--------------------------------------------------------------------
%% Zones
%%--------------------------------------------------------------------
@ -777,12 +734,6 @@ end}.
{datatype, {duration, s}}
]}.
%% @doc Type: simple | priority
{mapping, "zone.$name.mqueue_type", "emqx.zones", [
{default, simple},
{datatype, {enum, [simple, priority]}}
]}.
%% @doc Max queue length. Enqueued messages when persistent client
%% disconnected, or inflight window is full. 0 means no limit.
{mapping, "zone.$name.max_mqueue_len", "emqx.zones", [
@ -790,11 +741,23 @@ end}.
{datatype, integer}
]}.
%% @doc Topic Priorities: 0~255, Default is 0
%% @doc Topic Priorities, comma separated topic=priority pairs,
%% where priority should be integer in range 1-255 (inclusive)
%% 1 being the lowest and 255 being the highest.
%% default value `none` to indicate no priority table, hence all
%% messages are treated equal, which means either highest ('infinity'),
%% or lowest (0) depending on mqueue_default_priority config.
{mapping, "zone.$name.mqueue_priorities", "emqx.zones", [
{default, "none"},
{datatype, string}
]}.
%% @doc Default priority for topics not in priority table.
{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [
{default, lowest},
{datatype, {enum, [highest, lowest]}}
]}.
%% @doc Queue Qos0 messages?
{mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [
{default, true},
@ -859,6 +822,18 @@ end}.
max_heap_size => Siz1}
end,
{force_shutdown_policy, ShutdownPolicy};
("mqueue_priorities", Val) ->
case Val of
"none" -> none; % NO_PRIORITY_TABLE
_ ->
lists:foldl(fun(T, Acc) ->
%% NOTE: space in "= " is intended
[{Topic, Prio}] = string:tokens(T, "= "),
P = list_to_integer(Prio),
(P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}),
maps:put(iolist_to_binary(Topic), P, Acc)
end, string:tokens(Val, ","))
end;
(Opt, Val) ->
{list_to_atom(Opt), Val}
end,
@ -1516,7 +1491,13 @@ end}.
]}.
{mapping, "bridge.$name.forwards", "emqx.bridges", [
{datatype, string}
{datatype, string},
{default, ""}
]}.
{mapping, "bridge.$name.ssl", "emqx.bridges", [
{datatype, flag},
{default, off}
]}.
{mapping, "bridge.$name.cacertfile", "emqx.bridges", [
@ -1542,11 +1523,12 @@ end}.
{mapping, "bridge.$name.keepalive", "emqx.bridges", [
{default, "10s"},
{datatype, {duration, s}}
{datatype, {duration, ms}}
]}.
{mapping, "bridge.$name.tls_versions", "emqx.bridges", [
{datatype, string}
{datatype, string},
{default, "tlsv1,tlsv1.1,tlsv1.2"}
]}.
{mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [
@ -1564,12 +1546,11 @@ end}.
{mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [
{default, "30s"},
{datatype, {duration, s}}
{datatype, {duration, ms}}
]}.
{translation, "emqx.bridges", fun(Conf) ->
Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end,
IsSsl = fun(cacertfile) -> true;
@ -1586,31 +1567,33 @@ end}.
{ciphers, Split(Ciphers)};
(Opt, Val) ->
{Opt, Val}
end,
Merge = fun(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts);
false ->
[{Opt, Val}|Opts]
end
end,
Merge = fun(forwards, Val, Opts) ->
[{forwards, string:tokens(Val, ",")}|Opts];
(Opt, Val, Opts) ->
case IsSsl(Opt) of
true ->
SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])],
lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts));
false ->
[{Opt, Val}|Opts]
end
end,
Subscriptions = fun(Name) ->
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf),
lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])],
[QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])])
end,
maps:to_list(
lists:foldl(
fun({["bridge", Name, Opt], Val}, Acc) ->
%% e.g #{aws => [{OptKey, OptVal}]}
Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}],
maps:update_with(list_to_atom(Name),
fun(Opts) ->
Merge(list_to_atom(Opt), Val, Opts)
end, [{list_to_atom(Opt), Val},
{subscriptions, Subscriptions(Name)}], Acc);
fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc);
(_, Acc) -> Acc
end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf))))

View File

@ -1,17 +1,15 @@
{deps, [{jsx, "2.9.0"},
{gproc, "0.8.0"},
{lager, "3.6.5"},
{cowboy, "2.4.0"},
{lager_syslog, {git, "https://github.com/basho/lager_syslog", {branch, "3.0.1"}}}
{cowboy, "2.4.0"}
]}.
%% appended to deps in rebar.config.script
{github_emqx_deps,
[{gen_rpc, "2.2.0"},
{ekka, "v0.4.1"},
[{gen_rpc, "2.3.0"},
{ekka, "v0.5.0"},
{clique, "develop"},
{esockd, "v5.4.2"},
{cuttlefish, "emqx30"}
{cuttlefish, "v2.1.0"}
]}.
{edoc_opts, [{preprocess, true}]}.
@ -20,7 +18,6 @@
warn_unused_import,
warn_obsolete_guard,
debug_info,
{parse_transform, lager_transform},
{d, 'APPLICATION', emqx}]}.
{xref_checks, [undefined_function_calls, undefined_functions,
locals_not_used, deprecated_function_calls,
@ -29,6 +26,5 @@
{cover_opts, [verbose]}.
{cover_export_enabled, true}.
%% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish
{plugins, [coveralls, rebar3_neotoma_plugin]}.
{plugins, [coveralls]}.

View File

@ -1,9 +1,9 @@
{application,emqx,
[{description,"EMQ X Broker"},
{vsn,"3.0"},
{vsn,"3.0-rc.3"},
{modules,[]},
{registered,[emqx_sup]},
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd,
{applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd,
cowboy]},
{env,[]},
{mod,{emqx_app,[]}},

View File

@ -39,4 +39,3 @@ behaviour_info(_Other) ->
undefined.
-endif.

View File

@ -34,7 +34,7 @@
mountpoint, queue, mqueue_type, max_pending_messages,
forwards = [], subscriptions = []}).
-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false,
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
start_link(Name, Options) ->
@ -188,19 +188,23 @@ handle_cast(Msg, State) ->
%% start message bridge
%%----------------------------------------------------------------
handle_info(start, State = #state{options = Options,
client_pid = undefined,
reconnect_interval = ReconnectInterval}) ->
client_pid = undefined}) ->
case emqx_client:start_link([{owner, self()}|options(Options)]) of
{ok, ClientPid, _} ->
Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []),
emqx_topic:validate({filter, i2b(Topic)})],
Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","),
emqx_topic:validate({filter, i2b(Topic)})],
[emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs],
[emqx_broker:subscribe(Topic) || Topic <- Forwards],
{noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}};
{error,_} ->
erlang:send_after(ReconnectInterval, self(), start),
{ok, ClientPid} ->
case emqx_client:connect(ClientPid) of
{ok, _} ->
emqx_logger:info("[Bridge] connected to remote sucessfully"),
Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])),
Forwards = subscribe_local_topics(get_value(forwards, Options, [])),
{noreply, State#state{client_pid = ClientPid,
subscriptions = Subs,
forwards = Forwards}};
{error, Reason} ->
emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]),
{noreply, State#state{client_pid = ClientPid}}
end;
{error, Reason} ->
emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]),
{noreply, State}
end;
@ -218,7 +222,7 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r
{ok, PkgId} ->
{noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}};
{error, Reason} ->
emqx_logger:error("Publish fail:~p", [Reason]),
emqx_logger:error("[Bridge] Publish fail:~p", [Reason]),
{noreply, State}
end;
@ -240,11 +244,12 @@ handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueu
{noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}};
handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) ->
emqx_logger:warning("[Bridge] stop ~p", [normal]),
{noreply, State#state{client_pid = undefined}};
handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid,
reconnect_interval = ReconnectInterval}) ->
lager:warning("emqx bridge stop reason:~p", [Reason]),
emqx_logger:error("[Bridge] stop ~p", [Reason]),
erlang:send_after(ReconnectInterval, self(), start),
{noreply, State#state{client_pid = undefined}};
@ -258,6 +263,14 @@ terminate(_Reason, #state{}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
subscribe_remote_topics(ClientPid, Subscriptions) ->
[begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end
|| {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})].
subscribe_local_topics(Topics) ->
[begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end
|| Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})].
proto_ver(mqttv3) -> v3;
proto_ver(mqttv4) -> v4;
proto_ver(mqttv5) -> v5.
@ -285,13 +298,17 @@ options([{clean_start, CleanStart}| Options], Acc) ->
options([{address, Address}| Options], Acc) ->
{Host, Port} = address(Address),
options(Options, [{host, Host}, {port, Port}|Acc]);
options([{ssl, Ssl}| Options], Acc) ->
options(Options, [{ssl, Ssl}|Acc]);
options([{ssl_opts, SslOpts}| Options], Acc) ->
options(Options, [{ssl_opts, SslOpts}|Acc]);
options([_Option | Options], Acc) ->
options(Options, Acc).
name(Id) ->
list_to_atom(lists:concat([?MODULE, "_", Id])).
i2b(L) -> iolist_to_binary(L).
bin(L) -> iolist_to_binary(L).
mountpoint(undefined, Topic) ->
Topic;
@ -301,12 +318,12 @@ mountpoint(Prefix, Topic) ->
format_mountpoint(undefined) ->
undefined;
format_mountpoint(Prefix) ->
binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)).
store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg ->
[Data | Queue];
store(memory, _Data, Queue, _MaxPendingMsg) ->
lager:error("Beyond max pending messages"),
logger:error("Beyond max pending messages"),
Queue;
store(disk, Data, Queue, _MaxPendingMsg)->
[Data | Queue].

View File

@ -337,7 +337,6 @@ handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, sub
true ->
case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of
true ->
io:format("Ets: ~p, SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]),
gen_server:reply(From, ok),
{noreply, State};
false ->

View File

@ -21,6 +21,7 @@
-export([start_link/0, start_link/1]).
-export([request/5, request/6, request_async/7, receive_response/3]).
-export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]).
-export([connect/1]).
-export([subscribe/2, subscribe/3, subscribe/4]).
-export([publish/2, publish/3, publish/4, publish/5]).
-export([unsubscribe/2, unsubscribe/3]).
@ -95,7 +96,7 @@
| {force_ping, boolean()}
| {properties, properties()}).
-record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false,
-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false,
packet_id, topic, props, payload}).
-type(mqtt_msg() :: #mqtt_msg{}).
@ -200,18 +201,11 @@ start_link(Options) when is_map(Options) ->
start_link(Options) when is_list(Options) ->
ok = emqx_mqtt_props:validate(
proplists:get_value(properties, Options, #{})),
case start_client(with_owner(Options)) of
{ok, Client} ->
connect(Client);
Error -> Error
end.
start_client(Options) ->
case proplists:get_value(name, Options) of
undefined ->
gen_statem:start_link(?MODULE, [Options], []);
gen_statem:start_link(?MODULE, [with_owner(Options)], []);
Name when is_atom(Name) ->
gen_statem:start_link({local, Name}, ?MODULE, [Options], [])
gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], [])
end.
with_owner(Options) ->
@ -220,8 +214,7 @@ with_owner(Options) ->
undefined -> [{owner, self()} | Options]
end.
%% @private
-spec(connect(client()) -> {ok, client(), properties()} | {error, term()}).
-spec(connect(client()) -> {ok, properties()} | {error, term()}).
connect(Client) ->
gen_statem:call(Client, connect, infinity).
@ -538,14 +531,24 @@ init([{hosts, Hosts} | Opts], State) ->
init(Opts, State#state{hosts = Hosts1});
init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)});
init([ssl | Opts], State = #state{sock_opts = SockOpts}) ->
ok = ssl:start(),
SockOpts1 = emqx_misc:merge_opts([{ssl_opts, []}], SockOpts),
init(Opts, State#state{sock_opts = SockOpts1});
init([{ssl, EnableSsl} | Opts], State) ->
case lists:keytake(ssl_opts, 1, Opts) of
{value, SslOpts, WithOutSslOpts} ->
init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State);
false ->
init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State)
end;
init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) ->
ok = ssl:start(),
SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
init(Opts, State#state{sock_opts = SockOpts1});
case lists:keytake(ssl, 1, Opts) of
{value, {ssl, true}, WithOutEnableSsl} ->
ok = ssl:start(),
SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]),
init(WithOutEnableSsl, State#state{sock_opts = SockOpts1});
{value, {ssl, false}, WithOutEnableSsl} ->
init(WithOutEnableSsl, State);
false ->
init(Opts, State)
end;
init([{client_id, ClientId} | Opts], State) ->
init(Opts, State#state{client_id = iolist_to_binary(ClientId)});
init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) ->
@ -682,7 +685,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS,
undefined -> AllProps;
_ -> maps:merge(AllProps, Properties)
end,
Reply = {ok, self(), Properties},
Reply = {ok, Properties},
State2 = State1#state{client_id = assign_id(ClientId, AllProps1),
properties = AllProps1,
session_present = SessPresent},
@ -994,7 +997,7 @@ handle_event(info, {Error, _Sock, Reason}, _StateName, State)
handle_event(info, {Closed, _Sock}, _StateName, State)
when Closed =:= tcp_closed; Closed =:= ssl_closed ->
{stop, Closed, State};
{stop, {shutdown, Closed}, State};
handle_event(info, {'EXIT', Owner, Reason}, _, #state{owner = Owner}) ->
{stop, Reason};
@ -1217,7 +1220,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv
retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId},
Now, State = #state{inflight = Inflight}) ->
Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS1)},
Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)},
case send(Msg1, State) of
{ok, NewState} ->
Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight),

View File

@ -49,7 +49,10 @@ connect(Host, Port, SockOpts, Timeout) ->
end.
ssl_upgrade(Sock, SslOpts, Timeout) ->
case ssl:connect(Sock, SslOpts, Timeout) of
TlsVersions = proplists:get_value(versions, SslOpts, []),
Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)),
SslOpts2 = emqx_misc:merge_opts(SslOpts, [{ciphers, Ciphers}]),
case ssl:connect(Sock, SslOpts2, Timeout) of
{ok, SslSock} ->
ok = ssl:controlling_process(SslSock, self()),
{ok, #ssl_socket{tcp = Sock, ssl = SslSock}};
@ -91,3 +94,8 @@ sockname(Sock) when is_port(Sock) ->
sockname(#ssl_socket{ssl = SslSock}) ->
ssl:sockname(SslSock).
default_ciphers(TlsVersions) ->
lists:foldl(
fun(TlsVer, Ciphers) ->
Ciphers ++ ssl:cipher_suites(all, TlsVer)
end, [], TlsVersions).

View File

@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst).
send_fun(Transport, Socket, Peername) ->
fun(Data) ->
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
try Transport:async_send(Socket, Data) of
ok ->
?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}),
@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) ->
ok = emqx_gc:inc(1, Oct);
maybe_gc(_, _) ->
ok.

View File

@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1;
fixqos(?SUBSCRIBE, 0) -> 1;
fixqos(?UNSUBSCRIBE, 0) -> 1;
fixqos(_Type, QoS) -> QoS.

View File

@ -35,7 +35,7 @@ start_listener({Proto, ListenOn, Options}) ->
{ok, _} ->
io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!",
io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!",
[Proto, format(ListenOn), Reason])
end.
@ -119,7 +119,7 @@ stop_listener({Proto, ListenOn, Opts}) ->
ok ->
io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.",
io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.",
[Proto, format(ListenOn), Reason])
end.

View File

@ -63,8 +63,7 @@ init([Pool, Id, Node, Topic, Options]) ->
Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
MQueue = emqx_mqueue:init(#{type => simple,
max_len => State#state.max_queue_len,
MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len,
store_qos0 => true}),
{ok, State#state{pool = Pool, id = Id, mqueue = MQueue}};
false ->
@ -96,7 +95,8 @@ handle_cast(Msg, State) ->
handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) ->
%% TODO: how to drop???
{noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
{_Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
{noreply, State#state{mqueue = NewQ}};
handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),

View File

@ -22,38 +22,47 @@
-export([error/1, error/2, error/3]).
-export([critical/1, critical/2, critical/3]).
-export([add_proc_metadata/1]).
debug(Msg) ->
lager:debug(Msg).
logger:debug(Msg).
debug(Format, Args) ->
lager:debug(Format, Args).
debug(Metadata, Format, Args) when is_list(Metadata) ->
lager:debug(Metadata, Format, Args).
logger:debug(Format, Args).
debug(Metadata, Format, Args) when is_map(Metadata) ->
logger:debug(Format, Args, Metadata).
info(Msg) ->
lager:info(Msg).
logger:info(Msg).
info(Format, Args) ->
lager:info(Format, Args).
info(Metadata, Format, Args) when is_list(Metadata) ->
lager:info(Metadata, Format, Args).
logger:info(Format, Args).
info(Metadata, Format, Args) when is_map(Metadata) ->
logger:info(Format, Args, Metadata).
warning(Msg) ->
lager:warning(Msg).
logger:warning(Msg).
warning(Format, Args) ->
lager:warning(Format, Args).
warning(Metadata, Format, Args) when is_list(Metadata) ->
lager:warning(Metadata, Format, Args).
logger:warning(Format, Args).
warning(Metadata, Format, Args) when is_map(Metadata) ->
logger:warning(Format, Args, Metadata).
error(Msg) ->
lager:error(Msg).
logger:error(Msg).
error(Format, Args) ->
lager:error(Format, Args).
error(Metadata, Format, Args) when is_list(Metadata) ->
lager:error(Metadata, Format, Args).
logger:error(Format, Args).
error(Metadata, Format, Args) when is_map(Metadata) ->
logger:error(Format, Args, Metadata).
critical(Msg) ->
lager:critical(Msg).
logger:critical(Msg).
critical(Format, Args) ->
lager:critical(Format, Args).
critical(Metadata, Format, Args) when is_list(Metadata) ->
lager:critical(Metadata, Format, Args).
logger:critical(Format, Args).
critical(Metadata, Format, Args) when is_map(Metadata) ->
logger:critical(Format, Args, Metadata).
add_proc_metadata(Meta) ->
case logger:get_process_metadata() of
undefined ->
logger:set_process_metadata(Meta);
OldMeta ->
logger:set_process_metadata(maps:merge(OldMeta, Meta))
end.

View File

@ -0,0 +1,360 @@
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%% This file is copied from lib/kernel/src/logger_formatter.erl, and
%% modified for a more concise time format other than the default RFC3339.
-module(emqx_logger_formatter).
-export([format/2]).
-export([check_config/1]).
-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]).
-define(FormatP, "~0tp").
-define(IS_STRING(String),
(is_list(String) orelse is_binary(String))).
%%%-----------------------------------------------------------------
%%% Types
-type config() :: #{chars_limit => pos_integer() | unlimited,
depth => pos_integer() | unlimited,
max_size => pos_integer() | unlimited,
report_cb => logger:report_cb(),
quit => template()}.
-type template() :: [metakey() | {metakey(),template(),template()} | string()].
-type metakey() :: atom() | [atom()].
%%%-----------------------------------------------------------------
%%% API
-spec format(LogEvent,Config) -> unicode:chardata() when
LogEvent :: logger:log_event(),
Config :: config().
format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0)
when is_map(Config0) ->
Config = add_default_config(Config0),
Template = maps:get(template,Config),
{BT,AT0} = lists:splitwith(fun(msg) -> false; (_) -> true end, Template),
{DoMsg,AT} =
case AT0 of
[msg|Rest] -> {true,Rest};
_ ->{false,AT0}
end,
B = do_format(Level,Meta,BT,Config),
A = do_format(Level,Meta,AT,Config),
MsgStr =
if DoMsg ->
Config1 =
case maps:get(chars_limit,Config) of
unlimited ->
Config;
Size0 ->
Size =
case Size0 - string:length([B,A]) of
S when S>=0 -> S;
_ -> 0
end,
Config#{chars_limit=>Size}
end,
string:trim(format_msg(Msg0,Meta,Config1));
true ->
""
end,
truncate([B,MsgStr,A],maps:get(max_size,Config)).
do_format(Level,Data,[level|Format],Config) ->
[to_string(level,Level,Config)|do_format(Level,Data,Format,Config)];
do_format(Level,Data,[{Key,IfExist,Else}|Format],Config) ->
String =
case value(Key,Data) of
{ok,Value} -> do_format(Level,Data#{Key=>Value},IfExist,Config);
error -> do_format(Level,Data,Else,Config)
end,
[String|do_format(Level,Data,Format,Config)];
do_format(Level,Data,[Key|Format],Config)
when is_atom(Key) orelse
(is_list(Key) andalso is_atom(hd(Key))) ->
String =
case value(Key,Data) of
{ok,Value} -> to_string(Key,Value,Config);
error -> ""
end,
[String|do_format(Level,Data,Format,Config)];
do_format(Level,Data,[Str|Format],Config) ->
[Str|do_format(Level,Data,Format,Config)];
do_format(_Level,_Data,[],_Config) ->
[].
value(Key,Meta) when is_map_key(Key,Meta) ->
{ok,maps:get(Key,Meta)};
value([Key|Keys],Meta) when is_map_key(Key,Meta) ->
value(Keys,maps:get(Key,Meta));
value([],Value) ->
{ok,Value};
value(_,_) ->
error.
to_string(time,Time,Config) ->
format_time(Time,Config);
to_string(mfa,MFA,Config) ->
format_mfa(MFA,Config);
to_string(_,Value,Config) ->
to_string(Value,Config).
to_string(X,_) when is_atom(X) ->
atom_to_list(X);
to_string(X,_) when is_integer(X) ->
integer_to_list(X);
to_string(X,_) when is_pid(X) ->
pid_to_list(X);
to_string(X,_) when is_reference(X) ->
ref_to_list(X);
to_string(X,_) when is_list(X) ->
case printable_list(lists:flatten(X)) of
true -> X;
_ -> io_lib:format(?FormatP,[X])
end;
to_string(X,_) ->
io_lib:format("~s",[X]).
printable_list([]) ->
false;
printable_list(X) ->
io_lib:printable_list(X).
format_msg({string,Chardata},Meta,Config) ->
format_msg({"~ts",[Chardata]},Meta,Config);
format_msg({report,_}=Msg,Meta,#{report_cb:=Fun}=Config)
when is_function(Fun,1); is_function(Fun,2) ->
format_msg(Msg,Meta#{report_cb=>Fun},maps:remove(report_cb,Config));
format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,1) ->
try Fun(Report) of
{Format,Args} when is_list(Format), is_list(Args) ->
format_msg({Format,Args},maps:remove(report_cb,Meta),Config);
Other ->
format_msg({"REPORT_CB/1 ERROR: ~0tp; Returned: ~0tp",
[Report,Other]},Meta,Config)
catch C:R:S ->
format_msg({"REPORT_CB/1 CRASH: ~0tp; Reason: ~0tp",
[Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},Meta,Config)
end;
format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,2) ->
try Fun(Report,maps:with([depth,chars_limit,single_line],Config)) of
Chardata when ?IS_STRING(Chardata) ->
try chardata_to_list(Chardata) % already size limited by report_cb
catch _:_ ->
format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
[Report,Chardata]},Meta,Config)
end;
Other ->
format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp",
[Report,Other]},Meta,Config)
catch C:R:S ->
format_msg({"REPORT_CB/2 CRASH: ~0tp; Reason: ~0tp",
[Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},
Meta,Config)
end;
format_msg({report,Report},Meta,Config) ->
format_msg({report,Report},
Meta#{report_cb=>fun logger:format_report/1},
Config);
format_msg(Msg,_Meta,#{depth:=Depth,chars_limit:=CharsLimit}) ->
Opts = chars_limit_to_opts(CharsLimit),
do_format_msg(Msg, Depth, Opts).
chars_limit_to_opts(unlimited) -> [];
chars_limit_to_opts(CharsLimit) -> [{chars_limit,CharsLimit}].
do_format_msg({Format0,Args},Depth,Opts) ->
try
Format1 = io_lib:scan_format(Format0, Args),
Format = reformat(Format1, Depth),
io_lib:build_text(Format,Opts)
catch C:R:S ->
FormatError = "FORMAT ERROR: ~0tp - ~0tp",
case Format0 of
FormatError ->
%% already been here - avoid failing cyclically
erlang:raise(C,R,S);
_ ->
format_msg({FormatError,[Format0,Args]},Depth,Opts)
end
end.
reformat(Format,unlimited) ->
Format;
reformat([#{control_char:=C}=M|T], Depth) when C =:= $p ->
[limit_depth(M#{width => 0}, Depth)|reformat(T, Depth)];
reformat([#{control_char:=C}=M|T], Depth) when C =:= $P ->
[M#{width => 0}|reformat(T, Depth)];
reformat([#{control_char:=C}=M|T], Depth) when C =:= $p; C =:= $w ->
[limit_depth(M, Depth)|reformat(T, Depth)];
reformat([H|T], Depth) ->
[H|reformat(T, Depth)];
reformat([], _) ->
[].
limit_depth(M0, unlimited) ->
M0;
limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) ->
C = C0 - ($a - $A), %To uppercase.
M0#{control_char:=C,args:=Args++[Depth]}.
chardata_to_list(Chardata) ->
case unicode:characters_to_list(Chardata,unicode) of
List when is_list(List) ->
List;
Error ->
throw(Error)
end.
truncate(String,unlimited) ->
String;
truncate(String,Size) ->
Length = string:length(String),
if Length>Size ->
case lists:reverse(lists:flatten(String)) of
[$\n|_] ->
string:slice(String,0,Size-4)++"...\n";
_ ->
string:slice(String,0,Size-3)++"..."
end;
true ->
String
end.
%% Convert microseconds-timestamp into local datatime string in milliseconds
format_time(SysTime,#{})
when is_integer(SysTime) ->
Ms = SysTime rem 1000000 div 1000,
{Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond),
format_time({Date, {H, Mi, S, Ms}}).
format_time({{Y, M, D}, {H, Mi, S, Ms}}) ->
io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]);
format_time({{Y, M, D}, {H, Mi, S}}) ->
io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]).
format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) ->
atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A);
format_mfa({M,F,A},Config) when is_atom(M), is_atom(F), is_list(A) ->
format_mfa({M,F,length(A)},Config);
format_mfa(MFA,Config) ->
to_string(MFA,Config).
%% Ensure that all valid configuration parameters exist in the final
%% configuration map
add_default_config(Config0) ->
Default =
#{chars_limit=>unlimited,
error_logger_notice_header=>info},
MaxSize = get_max_size(maps:get(max_size,Config0,undefined)),
Depth = get_depth(maps:get(depth,Config0,undefined)),
add_default_template(maps:merge(Default,Config0#{max_size=>MaxSize,
depth=>Depth})).
add_default_template(#{template:=_}=Config) ->
Config;
add_default_template(Config) ->
Config#{template=>?DEFAULT_FORMAT_TEMPLATE_SINGLE}.
get_max_size(undefined) ->
unlimited;
get_max_size(S) ->
max(10,S).
get_depth(undefined) ->
error_logger:get_format_depth();
get_depth(S) ->
max(5,S).
-spec check_config(Config) -> ok | {error,term()} when
Config :: config().
check_config(Config) when is_map(Config) ->
do_check_config(maps:to_list(Config));
check_config(Config) ->
{error,{invalid_formatter_config,?MODULE,Config}}.
do_check_config([{Type,L}|Config]) when Type == chars_limit;
Type == depth;
Type == max_size ->
case check_limit(L) of
ok -> do_check_config(Config);
error -> {error,{invalid_formatter_config,?MODULE,{Type,L}}}
end;
do_check_config([{error_logger_notice_header,ELNH}|Config]) when ELNH == info;
ELNH == notice ->
do_check_config(Config);
do_check_config([{report_cb,RCB}|Config]) when is_function(RCB,1);
is_function(RCB,2) ->
do_check_config(Config);
do_check_config([{template,T}|Config]) ->
case check_template(T) of
ok -> do_check_config(Config);
error -> {error,{invalid_formatter_template,?MODULE,T}}
end;
do_check_config([C|_]) ->
{error,{invalid_formatter_config,?MODULE,C}};
do_check_config([]) ->
ok.
check_limit(L) when is_integer(L), L>0 ->
ok;
check_limit(unlimited) ->
ok;
check_limit(_) ->
error.
check_template([Key|T]) when is_atom(Key) ->
check_template(T);
check_template([Key|T]) when is_list(Key), is_atom(hd(Key)) ->
case lists:all(fun(X) when is_atom(X) -> true;
(_) -> false
end,
Key) of
true ->
check_template(T);
false ->
error
end;
check_template([{Key,IfExist,Else}|T])
when is_atom(Key) orelse
(is_list(Key) andalso is_atom(hd(Key))) ->
case check_template(IfExist) of
ok ->
case check_template(Else) of
ok ->
check_template(T);
error ->
error
end;
error ->
error
end;
check_template([Str|T]) when is_list(Str) ->
case io_lib:printable_unicode_list(Str) of
true -> check_template(T);
false -> error
end;
check_template([]) ->
ok;
check_template(_) ->
error.

View File

@ -34,7 +34,7 @@ make(Topic, Payload) ->
-spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload())
-> emqx_types:message()).
make(From, Topic, Payload) ->
make(From, ?QOS0, Topic, Payload).
make(From, ?QOS_0, Topic, Payload).
-spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(),
emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
@ -47,7 +47,7 @@ make(From, QoS, Topic, Payload) ->
payload = Payload,
timestamp = os:timestamp()}.
msgid(?QOS0) -> undefined;
msgid(?QOS_0) -> undefined;
msgid(_QoS) -> emqx_guid:gen().
set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) ->
@ -114,7 +114,7 @@ elapsed(Since) ->
max(0, timer:now_diff(os:timestamp(), Since) div 1000).
format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) ->
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)",
io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)",
[Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]).
format(_, undefined) ->

View File

@ -62,9 +62,11 @@ proc_stats(Pid) ->
-define(DISABLED, 0).
init_proc_mng_policy(undefined) -> ok;
init_proc_mng_policy(Zone) ->
#{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy =
emqx_zone:get_env(Zone, force_shutdown_policy),
#{max_heap_size := MaxHeapSizeInBytes}
= ShutdownPolicy
= emqx_zone:get_env(Zone, force_shutdown_policy),
MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize),
_ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded
erlang:put(force_shutdown_policy, ShutdownPolicy),
@ -106,4 +108,3 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
proc_info(Key) ->
{Key, Value} = erlang:process_info(self(), Key),
Value.

View File

@ -21,7 +21,7 @@ load() ->
lists:foreach(
fun({Mod, Env}) ->
ok = Mod:load(Env),
io:format("Load ~s module successfully.~n", [Mod])
logger:info("Load ~s module successfully.", [Mod])
end, emqx_config:get_env(modules, [])).
-spec(unload() -> ok).

View File

@ -22,7 +22,7 @@
-export_type([topic_filters/0]).
-export_type([packet_id/0, packet_type/0, packet/0]).
-type(qos() :: ?QOS0 | ?QOS1 | ?QOS2).
-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2).
-type(version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5).
-type(qos_name() :: qos0 | at_most_once |
qos1 | at_least_once |
@ -40,4 +40,3 @@
}).
-type(topic_filters() :: [{emqx_topic:topic(), subopts()}]).
-type(packet() :: #mqtt_packet{}).

View File

@ -13,8 +13,8 @@
%% @doc A Simple in-memory message queue.
%%
%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client
%% should be online in most of the time.
%% Notice that MQTT is not a (on-disk) persistent messaging queue.
%% It assumes that clients should be online in most of the time.
%%
%% This module implements a simple in-memory queue for MQTT persistent session.
%%
@ -37,7 +37,8 @@
%% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true`
%% in init options
%%
%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'.
%% 4. If the queue is full, drop the oldest one
%% unless `max_len' is set to `0' which implies (`infinity').
%%
%% @end
@ -46,132 +47,121 @@
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-export([init/1, type/1]).
-export([init/1]).
-export([is_empty/1]).
-export([len/1, max_len/1]).
-export([in/2, out/1]).
-export([stats/1, dropped/1]).
-define(PQUEUE, emqx_pqueue).
-export_type([mqueue/0, options/0]).
-type(priority() :: {iolist(), pos_integer()}).
-type(options() :: #{type := simple | priority,
max_len := non_neg_integer(),
priorities => list(priority()),
store_qos0 => boolean()}).
-type(topic() :: emqx_topic:topic()).
-type(priority() :: infinity | integer()).
-type(pq() :: emqx_pqueue:q()).
-type(count() :: non_neg_integer()).
-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}).
-type(options() :: #{max_len := count(),
priorities => p_table(),
default_priority => highest | lowest,
store_qos0 => boolean()
}).
-type(message() :: pemqx_types:message()).
-type(stat() :: {len, non_neg_integer()}
| {max_len, non_neg_integer()}
| {dropped, non_neg_integer()}).
-define(PQUEUE, emqx_pqueue).
-define(LOWEST_PRIORITY, 0).
-define(HIGHEST_PRIORITY, infinity).
-define(MAX_LEN_INFINITY, 0).
-record(mqueue, {
type :: simple | priority,
q :: queue:queue() | ?PQUEUE:q(),
%% priority table
priorities = [],
pseq = 0,
len = 0,
max_len = 0,
qos0 = false,
dropped = 0
store_qos0 = false :: boolean(),
max_len = ?MAX_LEN_INFINITY :: count(),
len = 0 :: count(),
dropped = 0 :: count(),
p_table = ?NO_PRIORITY_TABLE :: p_table(),
default_p = ?LOWEST_PRIORITY :: priority(),
q = ?PQUEUE:new() :: pq()
}).
-type(mqueue() :: #mqueue{}).
-export_type([mqueue/0, priority/0, options/0]).
-opaque(mqueue() :: #mqueue{}).
-spec(init(options()) -> mqueue()).
init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) ->
init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts).
init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) ->
MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of
true -> MaxLen0;
false -> ?MAX_LEN_INFINITY
end,
#mqueue{max_len = MaxLen,
store_qos0 = QoS_0,
p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE),
default_p = get_priority_opt(Opts)
}.
init_q(MQ = #mqueue{type = simple}, _Opts) ->
MQ#mqueue{q = queue:new()};
init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) ->
init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}).
is_empty(#mqueue{len = Len}) -> Len =:= 0.
init_pq([], MQ) ->
MQ;
init_pq([{Topic, P} | L], MQ) ->
{_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ),
init_pq(L, MQ1).
insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) ->
<<PInt:48>> = <<P:8, (erlang:phash2(Topic)):32, Seq:8>>,
{PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}.
-spec(type(mqueue()) -> simple | priority).
type(#mqueue{type = Type}) -> Type.
is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0;
is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q).
len(#mqueue{type = simple, len = Len}) -> Len;
len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q).
len(#mqueue{len = Len}) -> Len.
max_len(#mqueue{max_len = MaxLen}) -> MaxLen.
%% @doc Dropped of the mqueue
-spec(dropped(mqueue()) -> non_neg_integer()).
%% @doc Return number of dropped messages.
-spec(dropped(mqueue()) -> count()).
dropped(#mqueue{dropped = Dropped}) -> Dropped.
%% @doc Stats of the mqueue
-spec(stats(mqueue()) -> [stat()]).
stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) ->
[{len, case Type of
simple -> Len;
priority -> ?PQUEUE:len(Q)
end} | [{max_len, MaxLen}, {dropped, Dropped}]].
stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) ->
[{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}].
%% @doc Enqueue a message.
-spec(in(emqx_types:message(), mqueue()) -> mqueue()).
in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) ->
MQ;
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped})
when Len >= MaxLen ->
{{value, _Old}, Q2} = queue:out(Q),
MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1};
in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) ->
MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1};
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = 0}) ->
case lists:keysearch(Topic, 1, Priorities) of
{value, {_, Pri}} ->
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)};
-spec(in(message(), mqueue()) -> {undefined | message(), mqueue()}).
in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) ->
{_Dropped = undefined, MQ};
in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp,
p_table = PTab,
q = Q,
len = Len,
max_len = MaxLen,
dropped = Dropped
} = MQ) ->
Priority = get_priority(Topic, PTab, Dp),
PLen = ?PQUEUE:plen(Priority, Q),
case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of
true ->
%% reached max length, drop the oldest message
{{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q),
Q2 = ?PQUEUE:in(Msg, Priority, Q1),
{DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}};
false ->
{Pri, MQ1} = insert_p(Topic, 0, MQ),
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end;
in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q,
priorities = Priorities,
max_len = MaxLen}) ->
case lists:keysearch(Topic, 1, Priorities) of
{value, {_, Pri}} ->
case ?PQUEUE:plen(Pri, Q) >= MaxLen of
true ->
{_, Q1} = ?PQUEUE:out(Pri, Q),
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)};
false ->
MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
end;
false ->
{Pri, MQ1} = insert_p(Topic, 0, MQ),
MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}
{_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}}
end.
out(MQ = #mqueue{type = simple, len = 0}) ->
-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}).
out(MQ = #mqueue{len = 0, q = Q}) ->
0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap
{empty, MQ};
out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) ->
{R, Q2} = queue:out(Q),
{R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = simple, q = Q, len = Len}) ->
{R, Q2} = queue:out(Q),
{R, MQ#mqueue{q = Q2, len = Len - 1}};
out(MQ = #mqueue{type = priority, q = Q}) ->
{R, Q2} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q2}}.
out(MQ = #mqueue{q = Q, len = Len}) ->
{R, Q1} = ?PQUEUE:out(Q),
{R, MQ#mqueue{q = Q1, len = Len - 1}}.
get_opt(Key, Opts, Default) ->
case maps:get(Key, Opts, Default) of
undefined -> Default;
X -> X
end.
get_priority_opt(Opts) ->
case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of
lowest -> ?LOWEST_PRIORITY;
highest -> ?HIGHEST_PRIORITY;
N when is_integer(N) -> N
end.
%% MICRO-OPTIMIZATION: When there is no priority table defined (from config),
%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0)
%% because the lowest priority in emqx_pqueue is a fallback to queue:queue()
%% while the highest 'infinity' is a [{infinity, queue:queue()}]
get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY;
get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp).

View File

@ -103,7 +103,7 @@ validate_properties(_, _) ->
validate_subscription({Topic, #{qos := QoS}}) ->
emqx_topic:validate(filter, Topic) andalso validate_qos(QoS).
validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 ->
validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 ->
true;
validate_qos(_) -> error(bad_qos).

View File

@ -72,9 +72,8 @@
-define(NO_PROPS, undefined).
-define(LOG(Level, Format, Args, PState),
emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format,
[PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])).
-define(LOG(Level, Format, Args, _PState),
emqx_logger:Level("[MQTT] " ++ Format, Args)).
%%------------------------------------------------------------------------------
%% Init
@ -82,6 +81,7 @@
-spec(init(map(), list()) -> state()).
init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) ->
emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}),
Zone = proplists:get_value(zone, Options),
#pstate{zone = Zone,
sendfun = SendFun,
@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET(
client_id = ClientId,
username = Username,
password = Password} = Connect), PState) ->
emqx_logger:add_proc_metadata(#{client_id => ClientId}),
%% TODO: Mountpoint...
%% Msg -> emqx_mountpoint:mount(MountPoint, Msg)
WillMsg = make_will_msg(Connect),
@ -586,13 +586,10 @@ deliver({disconnect, _ReasonCode}, PState) ->
-spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}).
send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) ->
trace(send, Packet, PState),
case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of
case SendFun(Packet, #{version => Ver}) of
ok ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};
{binary, _Data} ->
emqx_metrics:sent(Packet),
{ok, inc_stats(send, Type, PState)};
{error, Reason} ->
{error, Reason}
end.

View File

@ -138,5 +138,5 @@ compat(connack, 16#9C) -> ?CONNACK_SERVER;
compat(connack, 16#9D) -> ?CONNACK_SERVER;
compat(connack, 16#9F) -> ?CONNACK_SERVER;
compat(suback, Code) when Code =< ?QOS2 -> Code;
compat(suback, Code) when Code =< ?QOS_2 -> Code;
compat(suback, Code) when Code >= 16#80 -> 16#80.

View File

@ -161,9 +161,8 @@
-define(TIMEOUT, 60000).
-define(LOG(Level, Format, Args, State),
emqx_logger:Level([{client, State#state.client_id}],
"Session(~s): " ++ Format, [State#state.client_id | Args])).
-define(LOG(Level, Format, Args, _State),
emqx_logger:Level("[Session] " ++ Format, Args)).
%% @doc Start a session proc.
-spec(start_link(SessAttrs :: map()) -> {ok, pid()}).
@ -341,6 +340,7 @@ init([Parent, #{zone := Zone,
max_inflight := MaxInflight,
topic_alias_maximum := TopicAliasMaximum,
will_msg := WillMsg}]) ->
emqx_logger:add_proc_metadata(#{client_id => ClientId}),
process_flag(trap_exit, true),
true = link(ConnPid),
IdleTimout = get_env(Zone, idle_timeout, 30000),
@ -377,10 +377,10 @@ init([Parent, #{zone := Zone,
gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State).
init_mqueue(Zone) ->
emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple),
max_len => get_env(Zone, max_mqueue_len, 1000),
priorities => get_env(Zone, mqueue_priorities, ""),
store_qos0 => get_env(Zone, mqueue_store_qos0, true)
emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000),
store_qos0 => get_env(Zone, mqueue_store_qos0, true),
priorities => get_env(Zone, mqueue_priorities),
default_priority => get_env(Zone, mqueue_default_priority)
}).
binding(ConnPid) ->
@ -802,13 +802,13 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) ->
end;
%% Deliver qos0 message directly to client
dispatch(Msg = #message{qos = ?QOS0} = Msg, State) ->
dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) ->
deliver(undefined, Msg, State),
inc_stats(deliver, Msg, State);
dispatch(Msg = #message{qos = QoS} = Msg,
State = #state{next_pkt_id = PacketId, inflight = Inflight})
when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 ->
case emqx_inflight:is_full(Inflight) of
true -> enqueue_msg(Msg, State);
false ->
@ -817,14 +817,15 @@ dispatch(Msg = #message{qos = QoS} = Msg,
end.
enqueue_msg(Msg, State = #state{mqueue = Q}) ->
inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}).
{_Dropped, NewQ} = emqx_mqueue:in(Msg, Q),
inc_stats(enqueue, Msg, State#state{mqueue = NewQ}).
%%------------------------------------------------------------------------------
%% Deliver
%%------------------------------------------------------------------------------
redeliver({PacketId, Msg = #message{qos = QoS}}, State) ->
deliver(PacketId, if QoS =:= ?QOS2 -> Msg;
deliver(PacketId, if QoS =:= ?QOS_2 -> Msg;
true -> emqx_message:set_flag(dup, Msg)
end, State);
@ -973,4 +974,3 @@ noreply(State) ->
shutdown(Reason, State) ->
{stop, {shutdown, Reason}, State}.

View File

@ -60,9 +60,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
emqx_sm_locker:trans(ClientId, CleanStart);
open_session(SessAttrs = #{clean_start := false,
client_id := ClientId,
max_inflight := MaxInflight,
topic_alias_maximum := TopicAliasMaximum}) ->
client_id := ClientId}) ->
ResumeStart = fun(_) ->
case resume_session(ClientId, SessAttrs) of
{ok, SPid} ->

View File

@ -25,9 +25,9 @@
-define(SYSMON, ?MODULE).
-define(LOG(Msg, ProcInfo),
emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])).
-define(LOG(Msg, ProcInfo, PortInfo),
emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])).
%% @doc Start system monitor
-spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}).
@ -130,7 +130,7 @@ handle_info({timeout, _Ref, reset}, State) ->
{noreply, State#state{events = []}, hibernate};
handle_info(Info, State) ->
lager:error("[SYSMON] unexpected Info: ~p", [Info]),
logger:error("[SYSMON] unexpected Info: ~p", [Info]),
{noreply, State}.
terminate(_Reason, #state{timer = TRef}) ->

View File

@ -25,12 +25,20 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-record(state, {level, traces}).
-record(state, {level, org_top_level, traces}).
-type(trace_who() :: {client | topic, binary()}).
-type(trace_who() :: {client_id | topic, binary()}).
-define(TRACER, ?MODULE).
-define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]).
-define(FORMAT, {emqx_logger_formatter,
#{template =>
[time," [",level,"] ",
{client_id,
[{peername,
[client_id,"@",peername," "],
[client_id, " "]}],
[]},
msg,"\n"]}}).
-spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
start_link() ->
@ -40,26 +48,26 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
%% Dont' trace '$SYS' publish
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]).
when is_binary(From); is_atom(From) ->
emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]).
%%------------------------------------------------------------------------------
%% Start/Stop trace
%%------------------------------------------------------------------------------
%% @doc Start to trace client or topic.
%% @doc Start to trace client_id or topic.
-spec(start_trace(trace_who(), string()) -> ok | {error, term()}).
start_trace({client, ClientId}, LogFile) ->
start_trace({start_trace, {client, ClientId}, LogFile});
start_trace({client_id, ClientId}, LogFile) ->
start_trace({start_trace, {client_id, ClientId}, LogFile});
start_trace({topic, Topic}, LogFile) ->
start_trace({start_trace, {topic, Topic}, LogFile}).
start_trace(Req) -> gen_server:call(?MODULE, Req, infinity).
%% @doc Stop tracing client or topic.
%% @doc Stop tracing client_id or topic.
-spec(stop_trace(trace_who()) -> ok | {error, term()}).
stop_trace({client, ClientId}) ->
gen_server:call(?MODULE, {stop_trace, {client, ClientId}});
stop_trace({client_id, ClientId}) ->
gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}});
stop_trace({topic, Topic}) ->
gen_server:call(?MODULE, {stop_trace, {topic, Topic}}).
@ -73,37 +81,45 @@ lookup_traces() ->
%%------------------------------------------------------------------------------
init([]) ->
{ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}.
{ok, #state{level = emqx_config:get_env(trace_level, debug),
org_top_level = get_top_level(),
traces = #{}}}.
handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) ->
case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of
{ok, exists} ->
{reply, {error, already_exists}, State};
{ok, Trace} ->
{reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}};
case logger:add_handler(handler_id(Who), logger_disk_log_h,
#{level => Level,
formatter => ?FORMAT,
filesync_repeat_interval => 1000,
config => #{type => halt, file => LogFile},
filter_default => stop,
filters => [{meta_key_filter,
{fun filter_by_meta_key/2, Who} }]}) of
ok ->
set_top_level(all), % open the top logger level to 'all'
emqx_logger:info("[Tracer] start trace for ~p", [Who]),
{reply, ok, State#state{traces = maps:put(Who, LogFile, Traces)}};
{error, Reason} ->
emqx_logger:error("[Tracer] trace error: ~p", [Reason]),
{reply, {error, Reason}, State};
{'EXIT', Error} ->
emqx_logger:error("[Tracer] trace exit: ~p", [Error]),
{reply, {error, Error}, State}
emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]),
{reply, {error, Reason}, State}
end;
handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) ->
handle_call({stop_trace, Who}, _From, State = #state{org_top_level = OrgTopLevel, traces = Traces}) ->
case maps:find(Who, Traces) of
{ok, {Trace, _LogFile}} ->
case lager:stop_trace(Trace) of
ok -> ok;
{error, Error} ->
emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error])
{ok, _LogFile} ->
case logger:remove_handler(handler_id(Who)) of
ok ->
emqx_logger:info("[Tracer] stop trace for ~p", [Who]);
{error, Reason} ->
emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason])
end,
set_top_level(OrgTopLevel), % reset the top logger level to original value
{reply, ok, State#state{traces = maps:remove(Who, Traces)}};
error ->
{reply, {error, not_found}, State}
end;
handle_call(lookup_traces, _From, State = #state{traces = Traces}) ->
{reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(Traces)], State};
{reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State};
handle_call(Req, _From, State) ->
emqx_logger:error("[Tracer] unexpected call: ~p", [Req]),
@ -123,3 +139,25 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handler_id({topic, Topic}) ->
list_to_atom("topic_" ++ binary_to_list(Topic));
handler_id({client_id, ClientId}) ->
list_to_atom("clientid_" ++ binary_to_list(ClientId)).
get_top_level() ->
#{level := OrgTopLevel} = logger:get_primary_config(),
OrgTopLevel.
set_top_level(Level) ->
logger:set_primary_config(level, Level).
filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) ->
case maps:find(MetaKey, Meta) of
{ok, MetaValue} -> LogEvent;
{ok, Topic} when MetaKey =:= topic ->
case emqx_topic:match(Topic, MetaValue) of
true -> LogEvent;
false -> ignore
end;
_ -> ignore
end.

View File

@ -45,9 +45,8 @@
-define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
-define(WSLOG(Level, Format, Args, State),
emqx_logger:Level("MQTT/WS(~s): " ++ Format,
[esockd_net:format(State#state.peername) | Args])).
-define(WSLOG(Level, Format, Args, _State),
emqx_logger:Level("[MQTT/WS] " ++ Format, Args)).
%%------------------------------------------------------------------------------
%% API
@ -144,12 +143,14 @@ websocket_init(#state{request = Req, options = Options}) ->
idle_timeout = IdleTimout}}.
send_fun(WsPid) ->
fun(Data) ->
fun(Packet, Options) ->
Data = emqx_frame:serialize(Packet, Options),
BinSize = iolist_size(Data),
emqx_metrics:inc('bytes/sent', BinSize),
put(send_oct, get(send_oct) + BinSize),
put(send_cnt, get(send_cnt) + 1),
WsPid ! {binary, iolist_to_binary(Data)}
WsPid ! {binary, iolist_to_binary(Data)},
ok
end.
stat_fun() ->
@ -299,4 +300,3 @@ stop(Error, State) ->
wsock_stats() ->
[{Key, get(Key)} || Key <- ?SOCK_STATS].

View File

@ -33,21 +33,6 @@
{large_heap,8388608},
{busy_port,false},
{busy_dist_port,true}]}]},
{sasl,[{sasl_error_logger,false}]},
{lager,
[{error_logger_hwm,1000},
{error_logger_redirect,true},
{log_dir,"{{ platform_log_dir }}"},
{handlers,
[{lager_console_backend,error},
{lager_file_backend,
[{file,"{{ platform_log_dir }}/error.log"},
{level,error},
{size,10485760},
{date,"$D0"},
{count,5}]},
{lager_syslog_backend,["emq",local0,error]}]},
{crash_log,"{{ platform_log_dir }}/crash.log"}]},
{gen_rpc,
[{socket_keepalive_count,2},
{socket_keepalive_interval,5},

View File

@ -59,20 +59,26 @@ end_per_suite(_Config) ->
emqx_ct_broker_helpers:run_teardown_steps().
request_response_exception(QoS) ->
{ok, Client, _} = emqx_client:start_link([{proto_ver, v5},
{properties, #{ 'Request-Response-Information' => 0 }}]),
{ok, Client} = emqx_client:start_link([{proto_ver, v5},
{properties, #{ 'Request-Response-Information' => 0 }}]),
{ok, _} = emqx_client:connect(Client),
?assertError(no_response_information,
emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)),
ok = emqx_client:disconnect(Client).
request_response_per_qos(QoS) ->
{ok, Requester, _} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
{ok, Responser, _} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"responser">>},
{properties, #{ 'Request-Response-Information' => 1}},
{request_handler, fun(_Req) -> <<"ResponseTest">> end}]),
{ok, Requester} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"requester">>},
{properties, #{ 'Request-Response-Information' => 1}}]),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser} = emqx_client:start_link([{proto_ver, v5},
{client_id, <<"responser">>},
{properties, #{
'Request-Response-Information' => 1}},
{request_handler,
fun(_Req) -> <<"ResponseTest">> end}
]),
{ok, _} = emqx_client:connect(Responser),
ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>),
{ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS),
ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) ->
@ -108,9 +114,15 @@ share_sub_request_topic_per_qos(QoS) ->
{client_id, atom_to_binary(ClientId, utf8)},
{properties, Properties}
] end,
{ok, Requester, _} = emqx_client:start_link(Opts(requester)),
{ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
{ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
{ok, Requester} = emqx_client:start_link(Opts(requester)),
{ok, _} = emqx_client:connect(Requester),
{ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]),
{ok, _} = emqx_client:connect(Responser1),
{ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]),
{ok, _} = emqx_client:connect(Responser2),
ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group),
ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group),
%% Send a request, wait for response, validate response then return responser ID
@ -148,7 +160,9 @@ receive_messages(Count, Msgs) ->
basic_test(_Config) ->
Topic = nth(1, ?TOPICS),
ct:print("Basic test starting"),
{ok, C, _} = emqx_client:start_link(),
{ok, C} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C),
{ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
{ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2),
@ -157,11 +171,15 @@ basic_test(_Config) ->
ok = emqx_client:disconnect(C).
will_message_test(_Config) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, true},
{ok, C1} = emqx_client:start_link([{clean_start, true},
{will_topic, nth(3, ?TOPICS)},
{will_payload, <<"client disconnected">>},
{keepalive, 2}]),
{ok, C2, _} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C1),
{ok, C2} = emqx_client:start_link(),
{ok, _} = emqx_client:connect(C2),
{ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2),
timer:sleep(10),
ok = emqx_client:stop(C1),
@ -171,26 +189,33 @@ will_message_test(_Config) ->
ct:print("Will message test succeeded").
offline_message_queueing_test(_) ->
{ok, C1, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2),
ok = emqx_client:disconnect(C1),
{ok, C2, _} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]),
{ok, C2} = emqx_client:start_link([{clean_start, true},
{client_id, <<"c2">>}]),
{ok, _} = emqx_client:connect(C2),
ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0),
{ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1),
{ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2),
timer:sleep(10),
emqx_client:disconnect(C2),
{ok, C3, _} = emqx_client:start_link([{clean_start, false},
{ok, C3} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c1">>}]),
{ok, _} = emqx_client:connect(C3),
timer:sleep(10),
emqx_client:disconnect(C3),
?assertEqual(3, length(receive_messages(3))).
overlapping_subscriptions_test(_) ->
{ok, C, _} = emqx_client:start_link([]),
{ok, C} = emqx_client:start_link([]),
{ok, _} = emqx_client:connect(C),
{ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2},
{nth(1, ?WILD_TOPICS), 1}]),
timer:sleep(10),
@ -228,8 +253,10 @@ overlapping_subscriptions_test(_) ->
redelivery_on_reconnect_test(_) ->
ct:print("Redelivery on reconnect test starting"),
{ok, C1, _} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, C1} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C1),
{ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2),
timer:sleep(10),
ok = emqx_client:pause(C1),
@ -240,8 +267,10 @@ redelivery_on_reconnect_test(_) ->
timer:sleep(10),
ok = emqx_client:disconnect(C1),
?assertEqual(0, length(receive_messages(2))),
{ok, C2, _} = emqx_client:start_link([{clean_start, false},
{ok, C2} = emqx_client:start_link([{clean_start, false},
{client_id, <<"c">>}]),
{ok, _} = emqx_client:connect(C2),
timer:sleep(10),
ok = emqx_client:disconnect(C2),
?assertEqual(2, length(receive_messages(2))).
@ -255,8 +284,10 @@ redelivery_on_reconnect_test(_) ->
dollar_topics_test(_) ->
ct:print("$ topics test starting"),
{ok, C, _} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, C} = emqx_client:start_link([{clean_start, true},
{keepalive, 0}]),
{ok, _} = emqx_client:connect(C),
{ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1),
{ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>,
<<"test">>, [{qos, 1}, {retain, false}]),

View File

@ -100,7 +100,7 @@ serialize_parse_connect(_) ->
?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)),
Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{
client_id = <<"clientId">>,
will_qos = ?QOS1,
will_qos = ?QOS_1,
will_flag = true,
will_retain = true,
will_topic = <<"will">>,
@ -427,4 +427,3 @@ parse(Bin, Opts) when is_map(Opts) ->
payload() ->
iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]).

View File

@ -35,7 +35,7 @@ all() ->
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#message.qos),
Msg1 = emqx_message:make(<<"clientid">>, ?QOS2, <<"topic">>, <<"payload">>),
Msg1 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#message.id)),
?assertEqual(2, Msg1#message.qos).

View File

@ -28,8 +28,8 @@ t_mount_unmount(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
Msg2 = emqx_mountpoint:mount(<<"mount">>, Msg),
?assertEqual(<<"mounttopic">>, Msg2#message.topic),
TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS2}}],
TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS2}}]),
TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS_2}}],
TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS_2}}]),
Msg = emqx_mountpoint:unmount(<<"mount">>, Msg2).
t_replvar(_) ->

View File

@ -28,57 +28,58 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue,
t_priority_mqueue, t_infinity_priority_mqueue].
t_in(_) ->
Opts = #{type => simple, max_len => 5, store_qos0 => true},
Opts = #{max_len => 5, store_qos0 => true},
Q = ?Q:init(Opts),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#message{}, Q),
{_, Q1} = ?Q:in(#message{}, Q),
?assertEqual(1, ?Q:len(Q1)),
Q2 = ?Q:in(#message{qos = 1}, Q1),
{_, Q2} = ?Q:in(#message{qos = 1}, Q1),
?assertEqual(2, ?Q:len(Q2)),
Q3 = ?Q:in(#message{qos = 2}, Q2),
Q4 = ?Q:in(#message{}, Q3),
Q5 = ?Q:in(#message{}, Q4),
{_, Q3} = ?Q:in(#message{qos = 2}, Q2),
{_, Q4} = ?Q:in(#message{}, Q3),
{_, Q5} = ?Q:in(#message{}, Q4),
?assertEqual(5, ?Q:len(Q5)).
t_in_qos0(_) ->
Opts = #{type => simple, max_len => 5, store_qos0 => false},
Opts = #{max_len => 5, store_qos0 => false},
Q = ?Q:init(Opts),
Q1 = ?Q:in(#message{qos = 0}, Q),
{_, Q1} = ?Q:in(#message{qos = 0}, Q),
?assert(?Q:is_empty(Q1)),
Q2 = ?Q:in(#message{qos = 0}, Q1),
{_, Q2} = ?Q:in(#message{qos = 0}, Q1),
?assert(?Q:is_empty(Q2)).
t_out(_) ->
Opts = #{type => simple, max_len => 5, store_qos0 => true},
Opts = #{max_len => 5, store_qos0 => true},
Q = ?Q:init(Opts),
{empty, Q} = ?Q:out(Q),
Q1 = ?Q:in(#message{}, Q),
{_, Q1} = ?Q:in(#message{}, Q),
{Value, Q2} = ?Q:out(Q1),
?assertEqual(0, ?Q:len(Q2)),
?assertEqual({value, #message{}}, Value).
t_simple_mqueue(_) ->
Opts = #{type => simple, max_len => 3, store_qos0 => false},
Opts = #{max_len => 3, store_qos0 => false},
Q = ?Q:init(Opts),
?assertEqual(simple, ?Q:type(Q)),
?assertEqual(3, ?Q:max_len(Q)),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2),
Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3),
{_, Q1} = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q),
{_, Q2} = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1),
{_, Q3} = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2),
{_, Q4} = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3),
?assertEqual(3, ?Q:len(Q4)),
{{value, Msg}, Q5} = ?Q:out(Q4),
?assertEqual(<<"2">>, Msg#message.payload),
?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)).
t_infinity_simple_mqueue(_) ->
Opts = #{type => simple, max_len => 0, store_qos0 => false},
Opts = #{max_len => 0, store_qos0 => false},
Q = ?Q:init(Opts),
?assert(?Q:is_empty(Q)),
?assertEqual(0, ?Q:max_len(Q)),
Qx = lists:foldl(fun(I, AccQ) ->
?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ)
Qx = lists:foldl(
fun(I, AccQ) ->
{_, NewQ} = ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ),
NewQ
end, Q, lists:seq(1, 255)),
?assertEqual(255, ?Q:len(Qx)),
?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)),
@ -86,45 +87,55 @@ t_infinity_simple_mqueue(_) ->
?assertEqual(<<1>>, V#message.payload).
t_priority_mqueue(_) ->
Opts = #{type => priority, max_len => 3, priorities => [{<<"t1">>, 1}, {<<"t2">>, 2}, {<<"t3">>, 3}], store_qos0 => false},
Opts = #{max_len => 3,
priorities =>
#{<<"t1">> => 1,
<<"t2">> => 2,
<<"t3">> => 3
},
store_qos0 => false},
Q = ?Q:init(Opts),
?assertEqual(priority, ?Q:type(Q)),
?assertEqual(3, ?Q:max_len(Q)),
?assert(?Q:is_empty(Q)),
Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2),
{_, Q1} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q),
{_, Q2} = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1),
{_, Q3} = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2),
?assertEqual(3, ?Q:len(Q3)),
Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3),
{_, Q4} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3),
?assertEqual(4, ?Q:len(Q4)),
Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4),
{_, Q5} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4),
?assertEqual(5, ?Q:len(Q5)),
Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
{_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5),
?assertEqual(5, ?Q:len(Q6)),
{{value, Msg}, Q7} = ?Q:out(Q6),
?assertEqual(4, ?Q:len(Q7)),
?assertEqual(<<"t3">>, Msg#message.topic).
t_infinity_priority_mqueue(_) ->
Opts = #{type => priority, max_len => 0, priorities => [{<<"t">>, 1}, {<<"t1">>, 2}], store_qos0 => false},
Opts = #{max_len => 0,
priorities =>
#{<<"t">> => 1,
<<"t1">> => 2
},
store_qos0 => false},
Q = ?Q:init(Opts),
?assertEqual(0, ?Q:max_len(Q)),
Qx = lists:foldl(fun(I, AccQ) ->
AccQ1 =
?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1)
end, Q, lists:seq(1, 255)),
{undefined, AccQ1} = ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ),
{undefined, AccQ2} = ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1),
AccQ2
end, Q, lists:seq(1, 255)),
?assertEqual(510, ?Q:len(Qx)),
?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)).
t_priority_mqueue2(_) ->
Opts = #{type => priority, max_length => 2, store_qos0 => false},
Opts = #{max_length => 2, store_qos0 => false},
Q = ?Q:init("priority_queue2_test", Opts),
2 = ?Q:max_len(Q),
Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
{_, Q1} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q),
{_, Q2} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1),
{_, Q3} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2),
{_, Q4} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3),
4 = ?Q:len(Q4),
{{value, _Val}, Q5} = ?Q:out(Q4),
3 = ?Q:len(Q5).

View File

@ -44,7 +44,7 @@ packet_type_name(_) ->
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
packet_validate(_) ->
?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))),
?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS_0}}]))),
?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))),
?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))),
?assert(emqx_packet:validate(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, <<"payload">>))),
@ -52,7 +52,7 @@ packet_validate(_) ->
?assertError(subscription_identifier_invalid,
emqx_packet:validate(
?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1},
[{<<"topic">>, #{qos => ?QOS0}}]))),
[{<<"topic">>, #{qos => ?QOS_0}}]))),
?assertError(topic_filters_invalid,
emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))),
?assertError(topic_name_invalid,
@ -90,14 +90,14 @@ packet_validate(_) ->
packet_message(_) ->
Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH,
qos = ?QOS0,
qos = ?QOS_0,
retain = false,
dup = false},
variable = #mqtt_packet_publish{topic_name = <<"topic">>,
packet_id = 10,
properties = #{}},
payload = <<"payload">>},
Msg = emqx_message:make(<<"clientid">>, ?QOS0, <<"topic">>, <<"payload">>),
Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>),
Msg2 = emqx_message:set_flag(retain, false, Msg),
Pkt = emqx_packet:from_message(10, Msg2),
Msg3 = emqx_message:set_header(username, "test", Msg2),
@ -112,8 +112,8 @@ packet_format(_) ->
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
@ -122,7 +122,7 @@ packet_will_msg(_) ->
client_id = <<"clientid">>,
username = "test",
will_retain = true,
will_qos = ?QOS2,
will_qos = ?QOS_2,
will_topic = <<"topic">>,
will_props = #{},
will_payload = <<"payload">>},

View File

@ -1,147 +0,0 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_protocol_SUITE).
-compile(export_all).
-compile(nowarn_export_all).
-include("emqx.hrl").
-include("emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-import(emqx_serializer, [serialize/1]).
all() ->
[{group, parser},
{group, serializer},
{group, packet},
{group, message}].
groups() ->
[{parser, [],
[parse_connect,
parse_bridge,
parse_publish,
parse_puback,
parse_pubrec,
parse_pubrel,
parse_pubcomp,
parse_subscribe,
parse_unsubscribe,
parse_pingreq,
parse_disconnect]},
{serializer, [],
[serialize_connect,
serialize_connack,
serialize_publish,
serialize_puback,
serialize_pubrel,
serialize_subscribe,
serialize_suback,
serialize_unsubscribe,
serialize_unsuback,
serialize_pingreq,
serialize_pingresp,
serialize_disconnect]},
{packet, [],
[packet_proto_name,
packet_type_name,
packet_connack_name,
packet_format]},
{message, [],
[message_make,
message_from_packet,
message_flag]}].
%%--------------------------------------------------------------------
%% Packet Cases
%%--------------------------------------------------------------------
packet_proto_name(_) ->
?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)),
?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)).
packet_type_name(_) ->
?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)),
?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)).
packet_connack_name(_) ->
?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)),
?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)),
?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)),
?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)),
?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)),
?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)).
packet_format(_) ->
io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]),
io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]),
io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]),
io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]),
io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]),
io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]),
io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]),
io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]),
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]).
%%--------------------------------------------------------------------
%% Message Cases
%%--------------------------------------------------------------------
message_make(_) ->
Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>),
?assertEqual(0, Msg#mqtt_message.qos),
Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>),
?assert(is_binary(Msg1#mqtt_message.id)),
?assertEqual(2, Msg1#mqtt_message.qos).
message_from_packet(_) ->
Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)),
?assertEqual(1, Msg#mqtt_message.qos),
?assertEqual(10, Msg#mqtt_message.pktid),
?assertEqual(<<"topic">>, Msg#mqtt_message.topic),
WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true,
will_topic = <<"WillTopic">>,
will_msg = <<"WillMsg">>}),
?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic),
?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload),
Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>,
?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)),
?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from),
io:format("~s", [emqx_message:format(Msg2)]).
message_flag(_) ->
Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>),
Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt),
Msg3 = emqx_message:set_flag(retain, Msg2),
Msg4 = emqx_message:set_flag(dup, Msg3),
?assert(Msg4#mqtt_message.dup),
?assert(Msg4#mqtt_message.retain),
Msg5 = emqx_message:set_flag(Msg4),
Msg6 = emqx_message:unset_flag(dup, Msg5),
Msg7 = emqx_message:unset_flag(retain, Msg6),
?assertNot(Msg7#mqtt_message.dup),
?assertNot(Msg7#mqtt_message.retain),
emqx_message:unset_flag(Msg7),
emqx_message:to_packet(Msg7).

View File

@ -52,10 +52,10 @@ init(_, _WSReq) ->
{ok, #state{}}.
websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) ->
lager:info("Client received frame~p", [Frame]),
logger:info("Client received frame~p", [Frame]),
{ok, State#state{buffer = [Frame|Buffer]}};
websocket_handle(Frame, _, State = #state{waiting = From}) ->
lager:info("Client received frame~p", [Frame]),
logger:info("Client received frame~p", [Frame]),
From ! Frame,
{ok, State#state{waiting = undefined}}.