diff --git a/Makefile b/Makefile index a3afce82a..07fe13cbb 100644 --- a/Makefile +++ b/Makefile @@ -4,31 +4,27 @@ PROJECT = emqx PROJECT_DESCRIPTION = EMQ X Broker PROJECT_VERSION = 3.0 -DEPS = jsx gproc gen_rpc lager ekka esockd cowboy clique emqx_passwd +DEPS = jsx gproc gen_rpc ekka esockd cowboy clique dep_jsx = git https://github.com/talentdeficit/jsx 2.9.0 dep_gproc = git https://github.com/uwiger/gproc 0.8.0 -dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.2.0 -dep_lager = git https://github.com/erlang-lager/lager 3.6.5 +dep_gen_rpc = git https://github.com/emqx/gen_rpc 2.3.0 dep_esockd = git https://github.com/emqx/esockd v5.4.2 -dep_ekka = git https://github.com/emqx/ekka v0.4.1 +dep_ekka = git https://github.com/emqx/ekka v0.5.0 dep_cowboy = git https://github.com/ninenines/cowboy 2.4.0 dep_clique = git https://github.com/emqx/clique develop -dep_emqx_passwd = git https://github.com/emqx/emqx-passwd win30 NO_AUTOPATCH = cuttlefish ERLC_OPTS += +debug_info -DAPPLICATION=emqx -ERLC_OPTS += +'{parse_transform, lager_transform}' BUILD_DEPS = cuttlefish -dep_cuttlefish = git https://github.com/emqx/cuttlefish emqx30 +dep_cuttlefish = git https://github.com/emqx/cuttlefish v2.1.0 #TEST_DEPS = emqx_ct_helplers #dep_emqx_ct_helplers = git git@github.com:emqx/emqx-ct-helpers TEST_ERLC_OPTS += +debug_info -DAPPLICATION=emqx -TEST_ERLC_OPTS += +'{parse_transform, lager_transform}' EUNIT_OPTS = verbose @@ -47,7 +43,7 @@ CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME) COVER = true -PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl lager compiler mnesia +PLT_APPS = sasl asn1 ssl syntax_tools runtime_tools crypto xmerl os_mon inets public_key ssl compiler mnesia DIALYZER_DIRS := ebin/ DIALYZER_OPTS := --verbose --statistics -Werror_handling -Wrace_conditions #-Wunmatched_returns @@ -74,8 +70,10 @@ etc/gen.emqx.conf: bbmustache etc/emqx.conf ok = file:write_file('etc/gen.emqx.conf', Targ), \ halt(0)." -app.config: cuttlefish etc/gen.emqx.conf - $(verbose) ./cuttlefish -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ +CUTTLEFISH_SCRIPT = _build/default/lib/cuttlefish/cuttlefish + +app.config: $(CUTTLEFISH_SCRIPT) etc/gen.emqx.conf + $(verbose) $(CUTTLEFISH_SCRIPT) -l info -e etc/ -c etc/gen.emqx.conf -i priv/emqx.schema -d data/ ct: app.config @@ -86,11 +84,8 @@ coveralls: @rebar3 coveralls send -cuttlefish: rebar-deps - @if [ ! -f cuttlefish ]; then \ - make -C _build/default/lib/cuttlefish; \ - mv _build/default/lib/cuttlefish/cuttlefish ./cuttlefish; \ - fi +$(CUTTLEFISH_SCRIPT): rebar-deps + @if [ ! -f cuttlefish ]; then make -C _build/default/lib/cuttlefish; fi rebar-xref: @rebar3 xref @@ -98,7 +93,7 @@ rebar-xref: rebar-deps: @rebar3 get-deps -rebar-eunit: cuttlefish +rebar-eunit: $(CUTTLEFISH_SCRIPT) @rebar3 eunit rebar-compile: diff --git a/etc/emqx.conf b/etc/emqx.conf index 1ac8c8290..02210a570 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -328,81 +328,62 @@ rpc.socket_keepalive_count = 9 ## Log ##-------------------------------------------------------------------- -## Sets the log dir. +## Where to emit the logs. +## Enable the console (standard output) logs. +## +## Value: off | file | console | both +## - off: disable logs entirely +## - file: write logs to file +## - console: write logs to standard I/O +## - both: write logs both to file and standard I/O +log.to = console + +## The log severity level. +## +## Value: debug | info | notice | warning | error | critical | alert | emergency +## +## Note: Only the messages with severity level greater than or equal to +## this level will be logged. +## +## Default: error +log.level = error + +## The dir for log files. ## ## Value: Folder log.dir = {{ platform_log_dir }} -## Where to emit the console logs. +## The log filename for logs of level specified in "log.level". ## -## Value: off | file | console | both -## - off: disabled -## - file: write to file -## - console: write to stdout -## - both: file and stdout -log.console = console +## Value: String +## Default: emqx.log +log.file = emqx.log -## Sets the severity level of console log. -## -## Value: debug | info | notice | warning | error | critical | alert | emergency -## -## Default: error -log.console.level = error - -## The file where console logs will be writed to, when 'log.console' is set as 'file'. -## -## Value: File Name -## log.console.file = {{ platform_log_dir }}/console.log - -## Maximum file size for console log. -## -## Value: Number(bytes) -## log.console.size = 10485760 - -## The rotation count for console log. +## Maximum size of each log file. ## ## Value: Number -## log.console.count = 5 +## Default: 10M +## Supported Unit: KB | MB | G +log.rotation.size = 10MB -## The file where info logs will be writed to. -## -## Value: File Name -## log.info.file = {{ platform_log_dir }}/info.log - -## Maximum file size for info log. -## -## Value: Number(bytes) -## log.info.size = 10485760 - -## The rotation count for info log. +## Maximum rotation count of log files. ## ## Value: Number -## log.info.count = 5 +## Default: 5 +log.rotation.count = 5 -## The file where error logs will be writed to. +## To create additional log files for specific log levels. ## ## Value: File Name -log.error.file = {{ platform_log_dir }}/error.log - -## Maximum file size for error log. +## Format: log.$level.file = $filename, +## where "$level" can be one of: debug, info, notice, warning, +## error, critical, alert, emergency +## Note: Log files for a specific log level will contain all the logs +## that greater than or equal to that level ## -## Value: Number(bytes) -log.error.size = 10485760 -## The rotation count for error log. -## -## Value: Number -log.error.count = 5 - -## Enable the crash log. -## -## Value: on | off -log.crash = on - -## The file for crash log. -## -## Value: File Name -log.crash.file = {{ platform_log_dir }}/crash.log +#log.info.file = info.log +#log.error.file = error.log ##-------------------------------------------------------------------- ## Authentication/Access Control @@ -424,22 +405,20 @@ acl_nomatch = allow ## Value: File Name acl_file = {{ platform_etc_dir }}/acl.conf -## Whether to enable ACL cache for publish. -## The ACL cache size -## The maximum count of ACL entries allowed for a client. +## Whether to enable ACL cache. +## +## If enabled, ACLs roles for each client will be cached in the memory ## ## Value: on | off enable_acl_cache = on -## The ACL cache size -## The maximum count of ACL entries allowed for a client. +## The maximum count of ACL entries can be cached for a client. ## ## Value: Integer greater than 0 ## Default: 32 acl_cache_max_size = 32 -## The ACL cache time-to-live. -## The time after which an ACL cache entry will be invalid +## The time after which an ACL cache entry will be deleted ## ## Value: Duration ## Default: 1 minute @@ -496,17 +475,6 @@ mqtt.wildcard_subscription = true ## Value: boolean mqtt.shared_subscription = true -## Message queue type. -## -## Value: simple | priority -mqtt.mqueue_type = simple - -## Topic priorities. Default is 0. -## -## Priority: Number [0-255] -## -## mqtt.mqueue_priorities = topic/1=10,topic/2=8 - ##-------------------------------------------------------------------- ## Zones ##-------------------------------------------------------------------- @@ -639,22 +607,29 @@ zone.external.await_rel_timeout = 300s ## Default: 2h, 2 hours zone.external.session_expiry_interval = 2h -## Message queue type. -## -## Value: simple | priority -zone.external.mqueue_type = simple - ## Maximum queue length. Enqueued messages when persistent client disconnected, ## or inflight window is full. 0 means no limit. ## ## Value: Number >= 0 zone.external.max_mqueue_len = 1000 -## Topic priorities. Default is 0. +## Topic priorities. +## 'none' to indicate no priority table (by default), hence all messages +## are treated equal ## -## Priority: Number [0-255] +## Priority number [1-255] +## Example: topic/1=10,topic/2=8 +## NOTE: comma and equal signs are not allowed for priority topic names +## NOTE: messages for topics not in the priority table are treated as +## either highest or lowest priority depending on the configured +## value for mqueue_default_priority ## -## zone.external.mqueue_priorities = topic/1=10,topic/2=8 +zone.external.mqueue_priorities = none + +## Default to highest priority for topics not matching priority table +## +## Value: highest | lowest +zone.external.mqueue_default_priority = highest ## Whether to enqueue Qos0 messages. ## @@ -1608,7 +1583,11 @@ bridge.aws.client_id = bridge_aws ## The Clean start flag of a remote bridge. ## ## Value: boolean -bridge.aws.clean_start = false +## Default: true +## +## NOTE: Some IoT platforms require clean_start +## must be set to 'true' +## bridge.aws.clean_start = true ## The username for a remote bridge. ## @@ -1668,21 +1647,25 @@ bridge.aws.mqueue_type = memory ## Value: Number bridge.aws.max_pending_messages = 10000 +## Bribge to remote server via SSL. +## +## Value: on | off +bridge.aws.ssl = off ## PEM-encoded CA certificates of the bridge. ## ## Value: File -## bridge.aws.cacertfile = cacert.pem +## bridge.aws.cacertfile = {{ platform_etc_dir }}/certs/cacert.pem -## SSL Certfile of the bridge. +## Client SSL Certfile of the bridge. ## ## Value: File -## bridge.aws.certfile = cert.pem +## bridge.aws.certfile = {{ platform_etc_dir }}/certs/client-cert.pem -## SSL Keyfile of the bridge. +## Client SSL Keyfile of the bridge. ## ## Value: File -## bridge.aws.keyfile = key.pem +## bridge.aws.keyfile = {{ platform_etc_dir }}/certs/client-key.pem ## SSL Ciphers used by the bridge. ## @@ -1737,7 +1720,11 @@ bridge.aws.max_pending_messages = 10000 ## The Clean start flag of a remote bridge. ## ## Value: boolean -## bridge.azure.clean_start = false +## Default: true +## +## NOTE: Some IoT platforms require clean_start +## must be set to 'true' +## bridge.azure.clean_start = true ## The username for a remote bridge. ## @@ -1803,12 +1790,12 @@ bridge.aws.max_pending_messages = 10000 ## Value: File ## bridge.azure.cacertfile = cacert.pem -## SSL Certfile of the bridge. +## Client SSL Certfile of the bridge. ## ## Value: File ## bridge.azure.certfile = cert.pem -## SSL Keyfile of the bridge. +## Client SSL Keyfile of the bridge. ## ## Value: File ## bridge.azure.keyfile = key.pem diff --git a/include/emqx.hrl b/include/emqx.hrl index 984f4c9e2..84feb16b7 100644 --- a/include/emqx.hrl +++ b/include/emqx.hrl @@ -27,6 +27,12 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). +%%-------------------------------------------------------------------- +%% Configs +%%-------------------------------------------------------------------- + +-define(NO_PRIORITY_TABLE, none). + %%-------------------------------------------------------------------- %% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index b9bf6b55e..3021e913a 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -43,11 +43,7 @@ -define(QOS_1, 1). %% At least once -define(QOS_2, 2). %% Exactly once --define(QOS0, 0). %% At most once --define(QOS1, 1). %% At least once --define(QOS2, 2). %% Exactly once - --define(IS_QOS(I), (I >= ?QOS0 andalso I =< ?QOS2)). +-define(IS_QOS(I), (I >= ?QOS_0 andalso I =< ?QOS_2)). -define(QOS_I(Name), begin diff --git a/priv/emqx.schema b/priv/emqx.schema index 04b3b8e12..8026a6400 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -382,145 +382,114 @@ end}. %% Log %%-------------------------------------------------------------------- -{mapping, "log.dir", "lager.log_dir", [ +{mapping, "log.to", "kernel.logger", [ + {default, console}, + {datatype, {enum, [off, file, console, both]}} +]}. + +{mapping, "log.level", "kernel.logger_level", [ + {default, error}, + {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, all]}} +]}. + +{mapping, "log.logger_sasl_compatible", "kernel.logger_sasl_compatible", [ + {default, true}, + {datatype, {enum, [true, false]}} +]}. + +{mapping, "log.dir", "kernel.logger", [ {default, "log"}, {datatype, string} ]}. -{mapping, "log.console", "lager.handlers", [ - {default, file}, - {datatype, {enum, [off, file, console, both]}} -]}. - -{mapping, "log.console.level", "lager.handlers", [ - {default, info}, - {datatype, {enum, [debug, info, notice, warning, error, critical, alert, emergency, none]}} -]}. - -{mapping, "log.console.file", "lager.handlers", [ - {default, "log/console.log"}, +{mapping, "log.file", "kernel.logger", [ + {default, "emqx.log"}, {datatype, file} ]}. -{mapping, "log.console.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} +{mapping, "log.rotation.size", "kernel.logger", [ + {default, "10MB"}, + {datatype, bytesize} ]}. -{mapping, "log.console.count", "lager.handlers", [ +{mapping, "log.rotation.count", "kernel.logger", [ {default, 5}, {datatype, integer} ]}. -{mapping, "log.info.file", "lager.handlers", [ +{mapping, "log.$level.file", "kernel.logger", [ {datatype, file} ]}. -{mapping, "log.info.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} -]}. - -{mapping, "log.info.count", "lager.handlers", [ - {default, 5}, - {datatype, integer} -]}. - -{mapping, "log.error.file", "lager.handlers", [ - {default, "log/error.log"}, - {datatype, file} -]}. - -{mapping, "log.error.size", "lager.handlers", [ - {default, 10485760}, - {datatype, integer} -]}. - -{mapping, "log.error.count", "lager.handlers", [ - {default, 5}, - {datatype, integer} -]}. - -{mapping, "log.error.redirect", "lager.error_logger_redirect", [ - {default, on}, - {datatype, flag}, - hidden -]}. - -{mapping, "log.error.messages_per_second", "lager.error_logger_hwm", [ - {default, 1000}, - {datatype, integer}, - hidden -]}. - -{translation, - "lager.handlers", - fun(Conf) -> - ErrorHandler = case cuttlefish:conf_get("log.error.file", Conf, undefined) of - undefined -> []; - ErrorFilename -> [{lager_file_backend, [{file, ErrorFilename}, - {level, error}, - {size, cuttlefish:conf_get("log.error.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.error.count", Conf)}]}] - end, - - InfoHandler = case cuttlefish:conf_get("log.info.file", Conf, undefined) of - undefined -> []; - InfoFilename -> [{lager_file_backend, [{file, InfoFilename}, - {level, info}, - {size, cuttlefish:conf_get("log.info.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.info.count", Conf)}]}] - end, - - ConsoleLogLevel = cuttlefish:conf_get("log.console.level", Conf), - ConsoleLogFile = cuttlefish:conf_get("log.console.file", Conf), - - ConsoleHandler = {lager_console_backend, [{level, ConsoleLogLevel}]}, - ConsoleFileHandler = {lager_file_backend, [{file, ConsoleLogFile}, - {level, ConsoleLogLevel}, - {size, cuttlefish:conf_get("log.console.size", Conf)}, - {date, "$D0"}, - {count, cuttlefish:conf_get("log.console.count", Conf)}]}, - - ConsoleHandlers = case cuttlefish:conf_get("log.console", Conf) of - off -> []; - file -> [ConsoleFileHandler]; - console -> [ConsoleHandler]; - both -> [ConsoleHandler, ConsoleFileHandler]; - _ -> [] - end, - ConsoleHandlers ++ ErrorHandler ++ InfoHandler - end -}. - -{mapping, "log.crash", "lager.crash_log", [ - {default, on}, - {datatype, flag} -]}. - -{mapping, "log.crash.file", "lager.crash_log", [ - {default, "log/crash.log"}, - {datatype, file} -]}. - -{translation, - "lager.crash_log", - fun(Conf) -> - case cuttlefish:conf_get("log.crash", Conf) of - false -> undefined; - _ -> - cuttlefish:conf_get("log.crash.file", Conf, "./log/crash.log") - end - end}. - {mapping, "sasl", "sasl.sasl_error_logger", [ {default, off}, {datatype, flag}, hidden ]}. +{translation, "kernel.logger", fun(Conf) -> + LogTo = cuttlefish:conf_get("log.to", Conf), + TopLogLevel = cuttlefish:conf_get("log.level", Conf), + Formatter = {emqx_logger_formatter, + #{template => + [time," [",level,"] ", + {client_id, + [{peername, + [client_id,"@",peername," "], + [client_id, " "]}], + []}, + msg,"\n"]}}, + FileConf = fun(Filename) -> + #{type => wrap, + file => filename:join(cuttlefish:conf_get("log.dir", Conf), Filename), + max_no_files => cuttlefish:conf_get("log.rotation.count", Conf), + max_no_bytes => cuttlefish:conf_get("log.rotation.size", Conf)} + end, + + %% For the default logger that outputs to console + DefaultHandler = + if LogTo =:= console orelse LogTo =:= both -> + [{handler, default, logger_std_h, + #{level => TopLogLevel, + config => #{type => standard_io}, + formatter => Formatter}}]; + true -> + [{handler, default, undefined}] + end, + + %% For the file logger + FileHandler = + if LogTo =:= file orelse LogTo =:= both -> + [{handler, file, logger_disk_log_h, + #{level => TopLogLevel, + config => FileConf(cuttlefish:conf_get("log.file", Conf)), + formatter => Formatter, + filesync_repeat_interval => no_repeat}}]; + true -> [] + end, + + %% For creating additional log files for specific log levels. + AdditionalLogFiles = + lists:foldl( + fun({[_, Level, _] = K, Filename}, Acc) when LogTo =:= file; LogTo =:= both -> + case cuttlefish_variable:is_fuzzy_match(K, ["log", "$level", "file"]) of + true -> [{Level, Filename} | Acc]; + false -> Acc + end; + ({_K, _V}, Acc) -> + Acc + end, [], Conf), + AdditionalHandlers = + [{handler, list_to_atom("file_for_"++Level), logger_disk_log_h, + #{level => list_to_atom(Level), + config => FileConf(Filename), + formatter => Formatter, + filesync_repeat_interval => no_repeat}} + || {Level, Filename} <- AdditionalLogFiles], + + DefaultHandler ++ FileHandler ++ AdditionalHandlers +end}. + %%-------------------------------------------------------------------- %% Authentication/ACL %%-------------------------------------------------------------------- @@ -624,18 +593,6 @@ end}. {datatype, {enum, [true, false]}} ]}. -%% @doc Type: simple | priority -{mapping, "mqtt.mqueue_type", "emqx.mqueue_type", [ - {default, simple}, - {datatype, {enum, [simple, priority]}} -]}. - -%% @doc Topic Priorities: 0~255, Default is 0 -{mapping, "mqtt.mqueue_priorities", "emqx.mqueue_priorities", [ - {default, ""}, - {datatype, string} -]}. - %%-------------------------------------------------------------------- %% Zones %%-------------------------------------------------------------------- @@ -777,12 +734,6 @@ end}. {datatype, {duration, s}} ]}. -%% @doc Type: simple | priority -{mapping, "zone.$name.mqueue_type", "emqx.zones", [ - {default, simple}, - {datatype, {enum, [simple, priority]}} -]}. - %% @doc Max queue length. Enqueued messages when persistent client %% disconnected, or inflight window is full. 0 means no limit. {mapping, "zone.$name.max_mqueue_len", "emqx.zones", [ @@ -790,11 +741,23 @@ end}. {datatype, integer} ]}. -%% @doc Topic Priorities: 0~255, Default is 0 +%% @doc Topic Priorities, comma separated topic=priority pairs, +%% where priority should be integer in range 1-255 (inclusive) +%% 1 being the lowest and 255 being the highest. +%% default value `none` to indicate no priority table, hence all +%% messages are treated equal, which means either highest ('infinity'), +%% or lowest (0) depending on mqueue_default_priority config. {mapping, "zone.$name.mqueue_priorities", "emqx.zones", [ + {default, "none"}, {datatype, string} ]}. +%% @doc Default priority for topics not in priority table. +{mapping, "zone.$name.mqueue_default_priority", "emqx.zones", [ + {default, lowest}, + {datatype, {enum, [highest, lowest]}} +]}. + %% @doc Queue Qos0 messages? {mapping, "zone.$name.mqueue_store_qos0", "emqx.zones", [ {default, true}, @@ -859,6 +822,18 @@ end}. max_heap_size => Siz1} end, {force_shutdown_policy, ShutdownPolicy}; + ("mqueue_priorities", Val) -> + case Val of + "none" -> none; % NO_PRIORITY_TABLE + _ -> + lists:foldl(fun(T, Acc) -> + %% NOTE: space in "= " is intended + [{Topic, Prio}] = string:tokens(T, "= "), + P = list_to_integer(Prio), + (P < 0 orelse P > 255) andalso error({bad_priority, Topic, Prio}), + maps:put(iolist_to_binary(Topic), P, Acc) + end, string:tokens(Val, ",")) + end; (Opt, Val) -> {list_to_atom(Opt), Val} end, @@ -1516,7 +1491,13 @@ end}. ]}. {mapping, "bridge.$name.forwards", "emqx.bridges", [ - {datatype, string} + {datatype, string}, + {default, ""} +]}. + +{mapping, "bridge.$name.ssl", "emqx.bridges", [ + {datatype, flag}, + {default, off} ]}. {mapping, "bridge.$name.cacertfile", "emqx.bridges", [ @@ -1542,11 +1523,12 @@ end}. {mapping, "bridge.$name.keepalive", "emqx.bridges", [ {default, "10s"}, - {datatype, {duration, s}} + {datatype, {duration, ms}} ]}. {mapping, "bridge.$name.tls_versions", "emqx.bridges", [ - {datatype, string} + {datatype, string}, + {default, "tlsv1,tlsv1.1,tlsv1.2"} ]}. {mapping, "bridge.$name.subscription.$id.topic", "emqx.bridges", [ @@ -1564,12 +1546,11 @@ end}. {mapping, "bridge.$name.reconnect_interval", "emqx.bridges", [ {default, "30s"}, - {datatype, {duration, s}} + {datatype, {duration, ms}} ]}. {translation, "emqx.bridges", fun(Conf) -> - Split = fun(undefined) -> undefined; (S) -> string:tokens(S, ",") end, IsSsl = fun(cacertfile) -> true; @@ -1586,31 +1567,33 @@ end}. {ciphers, Split(Ciphers)}; (Opt, Val) -> {Opt, Val} - end, - - Merge = fun(Opt, Val, Opts) -> - case IsSsl(Opt) of - true -> - SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], - lists:ukeymerge(1, [{ssl_opts, SslOpts}], Opts); - false -> - [{Opt, Val}|Opts] - end end, + + Merge = fun(forwards, Val, Opts) -> + [{forwards, string:tokens(Val, ",")}|Opts]; + (Opt, Val, Opts) -> + case IsSsl(Opt) of + true -> + SslOpts = [Parse(Opt, Val)|proplists:get_value(ssl_opts, Opts, [])], + lists:ukeymerge(1, [{ssl_opts, SslOpts}], lists:usort(Opts)); + false -> + [{Opt, Val}|Opts] + end + end, + Subscriptions = fun(Name) -> - Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), - lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], - [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) + Configs = cuttlefish_variable:filter_by_prefix("bridge." ++ Name ++ ".subscription", Conf), + lists:zip([Topic || {_, Topic} <- lists:sort([{I, Topic} || {[_, _, "subscription", I, "topic"], Topic} <- Configs])], + [QoS || {_, QoS} <- lists:sort([{I, QoS} || {[_, _, "subscription", I, "qos"], QoS} <- Configs])]) end, maps:to_list( lists:foldl( fun({["bridge", Name, Opt], Val}, Acc) -> + %% e.g #{aws => [{OptKey, OptVal}]} + Init = [{list_to_atom(Opt), Val},{subscriptions, Subscriptions(Name)}], maps:update_with(list_to_atom(Name), - fun(Opts) -> - Merge(list_to_atom(Opt), Val, Opts) - end, [{list_to_atom(Opt), Val}, - {subscriptions, Subscriptions(Name)}], Acc); + fun(Opts) -> Merge(list_to_atom(Opt), Val, Opts) end, Init, Acc); (_, Acc) -> Acc end, #{}, lists:usort(cuttlefish_variable:filter_by_prefix("bridge.", Conf)))) diff --git a/rebar.config b/rebar.config index 63b3a7595..0af3f6b3e 100644 --- a/rebar.config +++ b/rebar.config @@ -1,17 +1,15 @@ {deps, [{jsx, "2.9.0"}, {gproc, "0.8.0"}, - {lager, "3.6.5"}, - {cowboy, "2.4.0"}, - {lager_syslog, {git, "https://github.com/basho/lager_syslog", {branch, "3.0.1"}}} + {cowboy, "2.4.0"} ]}. %% appended to deps in rebar.config.script {github_emqx_deps, - [{gen_rpc, "2.2.0"}, - {ekka, "v0.4.1"}, + [{gen_rpc, "2.3.0"}, + {ekka, "v0.5.0"}, {clique, "develop"}, {esockd, "v5.4.2"}, - {cuttlefish, "emqx30"} + {cuttlefish, "v2.1.0"} ]}. {edoc_opts, [{preprocess, true}]}. @@ -20,7 +18,6 @@ warn_unused_import, warn_obsolete_guard, debug_info, - {parse_transform, lager_transform}, {d, 'APPLICATION', emqx}]}. {xref_checks, [undefined_function_calls, undefined_functions, locals_not_used, deprecated_function_calls, @@ -29,6 +26,5 @@ {cover_opts, [verbose]}. {cover_export_enabled, true}. -%% rebar3_neotoma_plugin is needed to compile the .peg file for cuttlefish -{plugins, [coveralls, rebar3_neotoma_plugin]}. +{plugins, [coveralls]}. diff --git a/src/emqx.app.src b/src/emqx.app.src index 18ee93a30..77305f6e9 100644 --- a/src/emqx.app.src +++ b/src/emqx.app.src @@ -1,9 +1,9 @@ {application,emqx, [{description,"EMQ X Broker"}, - {vsn,"3.0"}, + {vsn,"3.0-rc.3"}, {modules,[]}, {registered,[emqx_sup]}, - {applications,[kernel,stdlib,jsx,gproc,gen_rpc,lager,esockd, + {applications,[kernel,stdlib,jsx,gproc,gen_rpc,esockd, cowboy]}, {env,[]}, {mod,{emqx_app,[]}}, diff --git a/src/emqx_auth_mod.erl b/src/emqx_auth_mod.erl index 8f03eb4fa..00ecb659a 100644 --- a/src/emqx_auth_mod.erl +++ b/src/emqx_auth_mod.erl @@ -39,4 +39,3 @@ behaviour_info(_Other) -> undefined. -endif. - diff --git a/src/emqx_bridge.erl b/src/emqx_bridge.erl index 461564bd2..b1ccdc44b 100644 --- a/src/emqx_bridge.erl +++ b/src/emqx_bridge.erl @@ -30,11 +30,11 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {client_pid, options, reconnect_interval, +-record(state, {client_pid, options, reconnect_interval, mountpoint, queue, mqueue_type, max_pending_messages, forwards = [], subscriptions = []}). --record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). start_link(Name, Options) -> @@ -188,19 +188,23 @@ handle_cast(Msg, State) -> %% start message bridge %%---------------------------------------------------------------- handle_info(start, State = #state{options = Options, - client_pid = undefined, - reconnect_interval = ReconnectInterval}) -> + client_pid = undefined}) -> case emqx_client:start_link([{owner, self()}|options(Options)]) of - {ok, ClientPid, _} -> - Subs = [{i2b(Topic), Qos} || {Topic, Qos} <- get_value(subscriptions, Options, []), - emqx_topic:validate({filter, i2b(Topic)})], - Forwards = [i2b(Topic) || Topic <- string:tokens(get_value(forwards, Options, ""), ","), - emqx_topic:validate({filter, i2b(Topic)})], - [emqx_client:subscribe(ClientPid, {Topic, Qos}) || {Topic, Qos} <- Subs], - [emqx_broker:subscribe(Topic) || Topic <- Forwards], - {noreply, State#state{client_pid = ClientPid, subscriptions = Subs, forwards = Forwards}}; - {error,_} -> - erlang:send_after(ReconnectInterval, self(), start), + {ok, ClientPid} -> + case emqx_client:connect(ClientPid) of + {ok, _} -> + emqx_logger:info("[Bridge] connected to remote sucessfully"), + Subs = subscribe_remote_topics(ClientPid, get_value(subscriptions, Options, [])), + Forwards = subscribe_local_topics(get_value(forwards, Options, [])), + {noreply, State#state{client_pid = ClientPid, + subscriptions = Subs, + forwards = Forwards}}; + {error, Reason} -> + emqx_logger:error("[Bridge] connect to remote failed! error: ~p", [Reason]), + {noreply, State#state{client_pid = ClientPid}} + end; + {error, Reason} -> + emqx_logger:error("[Bridge] start failed! error: ~p", [Reason]), {noreply, State} end; @@ -218,7 +222,7 @@ handle_info({dispatch, _, #message{topic = Topic, payload = Payload, flags = #{r {ok, PkgId} -> {noreply, State#state{queue = store(MqueueType, {PkgId, Msg}, Queue, MaxPendingMsg)}}; {error, Reason} -> - emqx_logger:error("Publish fail:~p", [Reason]), + emqx_logger:error("[Bridge] Publish fail:~p", [Reason]), {noreply, State} end; @@ -240,11 +244,12 @@ handle_info({puback, #{packet_id := PkgId}}, State = #state{queue = Queue, mqueu {noreply, State#state{queue = delete(MqueueType, PkgId, Queue)}}; handle_info({'EXIT', Pid, normal}, State = #state{client_pid = Pid}) -> + emqx_logger:warning("[Bridge] stop ~p", [normal]), {noreply, State#state{client_pid = undefined}}; handle_info({'EXIT', Pid, Reason}, State = #state{client_pid = Pid, reconnect_interval = ReconnectInterval}) -> - lager:warning("emqx bridge stop reason:~p", [Reason]), + emqx_logger:error("[Bridge] stop ~p", [Reason]), erlang:send_after(ReconnectInterval, self(), start), {noreply, State#state{client_pid = undefined}}; @@ -258,6 +263,14 @@ terminate(_Reason, #state{}) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +subscribe_remote_topics(ClientPid, Subscriptions) -> + [begin emqx_client:subscribe(ClientPid, {bin(Topic), Qos}), {bin(Topic), Qos} end + || {Topic, Qos} <- Subscriptions, emqx_topic:validate({filter, bin(Topic)})]. + +subscribe_local_topics(Topics) -> + [begin emqx_broker:subscribe(bin(Topic)), bin(Topic) end + || Topic <- Topics, emqx_topic:validate({filter, bin(Topic)})]. + proto_ver(mqttv3) -> v3; proto_ver(mqttv4) -> v4; proto_ver(mqttv5) -> v5. @@ -285,13 +298,17 @@ options([{clean_start, CleanStart}| Options], Acc) -> options([{address, Address}| Options], Acc) -> {Host, Port} = address(Address), options(Options, [{host, Host}, {port, Port}|Acc]); +options([{ssl, Ssl}| Options], Acc) -> + options(Options, [{ssl, Ssl}|Acc]); +options([{ssl_opts, SslOpts}| Options], Acc) -> + options(Options, [{ssl_opts, SslOpts}|Acc]); options([_Option | Options], Acc) -> options(Options, Acc). name(Id) -> list_to_atom(lists:concat([?MODULE, "_", Id])). -i2b(L) -> iolist_to_binary(L). +bin(L) -> iolist_to_binary(L). mountpoint(undefined, Topic) -> Topic; @@ -301,12 +318,12 @@ mountpoint(Prefix, Topic) -> format_mountpoint(undefined) -> undefined; format_mountpoint(Prefix) -> - binary:replace(i2b(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). + binary:replace(bin(Prefix), <<"${node}">>, atom_to_binary(node(), utf8)). store(memory, Data, Queue, MaxPendingMsg) when length(Queue) =< MaxPendingMsg -> [Data | Queue]; store(memory, _Data, Queue, _MaxPendingMsg) -> - lager:error("Beyond max pending messages"), + logger:error("Beyond max pending messages"), Queue; store(disk, Data, Queue, _MaxPendingMsg)-> [Data | Queue]. diff --git a/src/emqx_broker.erl b/src/emqx_broker.erl index 857090c25..1a04cf997 100644 --- a/src/emqx_broker.erl +++ b/src/emqx_broker.erl @@ -337,7 +337,6 @@ handle_cast({From, #subscribe{topic = Topic, subpid = SubPid, subid = SubId, sub true -> case ets:lookup_element(?SUBOPTION, {Topic, Subscriber}, 2) =:= SubOpts of true -> - io:format("Ets: ~p, SubOpts: ~p", [ets:lookup_element(?SUBOPTION, Topic, Subscriber), SubOpts]), gen_server:reply(From, ok), {noreply, State}; false -> diff --git a/src/emqx_client.erl b/src/emqx_client.erl index 22c37d26f..2d9cb0a80 100644 --- a/src/emqx_client.erl +++ b/src/emqx_client.erl @@ -21,6 +21,7 @@ -export([start_link/0, start_link/1]). -export([request/5, request/6, request_async/7, receive_response/3]). -export([set_request_handler/2, sub_request_topic/3, sub_request_topic/4]). +-export([connect/1]). -export([subscribe/2, subscribe/3, subscribe/4]). -export([publish/2, publish/3, publish/4, publish/5]). -export([unsubscribe/2, unsubscribe/3]). @@ -95,7 +96,7 @@ | {force_ping, boolean()} | {properties, properties()}). --record(mqtt_msg, {qos = ?QOS0, retain = false, dup = false, +-record(mqtt_msg, {qos = ?QOS_0, retain = false, dup = false, packet_id, topic, props, payload}). -type(mqtt_msg() :: #mqtt_msg{}). @@ -200,18 +201,11 @@ start_link(Options) when is_map(Options) -> start_link(Options) when is_list(Options) -> ok = emqx_mqtt_props:validate( proplists:get_value(properties, Options, #{})), - case start_client(with_owner(Options)) of - {ok, Client} -> - connect(Client); - Error -> Error - end. - -start_client(Options) -> case proplists:get_value(name, Options) of undefined -> - gen_statem:start_link(?MODULE, [Options], []); + gen_statem:start_link(?MODULE, [with_owner(Options)], []); Name when is_atom(Name) -> - gen_statem:start_link({local, Name}, ?MODULE, [Options], []) + gen_statem:start_link({local, Name}, ?MODULE, [with_owner(Options)], []) end. with_owner(Options) -> @@ -220,8 +214,7 @@ with_owner(Options) -> undefined -> [{owner, self()} | Options] end. -%% @private --spec(connect(client()) -> {ok, client(), properties()} | {error, term()}). +-spec(connect(client()) -> {ok, properties()} | {error, term()}). connect(Client) -> gen_statem:call(Client, connect, infinity). @@ -538,14 +531,24 @@ init([{hosts, Hosts} | Opts], State) -> init(Opts, State#state{hosts = Hosts1}); init([{tcp_opts, TcpOpts} | Opts], State = #state{sock_opts = SockOpts}) -> init(Opts, State#state{sock_opts = emqx_misc:merge_opts(SockOpts, TcpOpts)}); -init([ssl | Opts], State = #state{sock_opts = SockOpts}) -> - ok = ssl:start(), - SockOpts1 = emqx_misc:merge_opts([{ssl_opts, []}], SockOpts), - init(Opts, State#state{sock_opts = SockOpts1}); +init([{ssl, EnableSsl} | Opts], State) -> + case lists:keytake(ssl_opts, 1, Opts) of + {value, SslOpts, WithOutSslOpts} -> + init([SslOpts, {ssl, EnableSsl}| WithOutSslOpts], State); + false -> + init([{ssl_opts, []}, {ssl, EnableSsl}| Opts], State) + end; init([{ssl_opts, SslOpts} | Opts], State = #state{sock_opts = SockOpts}) -> - ok = ssl:start(), - SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]), - init(Opts, State#state{sock_opts = SockOpts1}); + case lists:keytake(ssl, 1, Opts) of + {value, {ssl, true}, WithOutEnableSsl} -> + ok = ssl:start(), + SockOpts1 = emqx_misc:merge_opts(SockOpts, [{ssl_opts, SslOpts}]), + init(WithOutEnableSsl, State#state{sock_opts = SockOpts1}); + {value, {ssl, false}, WithOutEnableSsl} -> + init(WithOutEnableSsl, State); + false -> + init(Opts, State) + end; init([{client_id, ClientId} | Opts], State) -> init(Opts, State#state{client_id = iolist_to_binary(ClientId)}); init([{clean_start, CleanStart} | Opts], State) when is_boolean(CleanStart) -> @@ -682,7 +685,7 @@ waiting_for_connack(cast, ?CONNACK_PACKET(?RC_SUCCESS, undefined -> AllProps; _ -> maps:merge(AllProps, Properties) end, - Reply = {ok, self(), Properties}, + Reply = {ok, Properties}, State2 = State1#state{client_id = assign_id(ClientId, AllProps1), properties = AllProps1, session_present = SessPresent}, @@ -994,7 +997,7 @@ handle_event(info, {Error, _Sock, Reason}, _StateName, State) handle_event(info, {Closed, _Sock}, _StateName, State) when Closed =:= tcp_closed; Closed =:= ssl_closed -> - {stop, Closed, State}; + {stop, {shutdown, Closed}, State}; handle_event(info, {'EXIT', Owner, Reason}, _, #state{owner = Owner}) -> {stop, Reason}; @@ -1217,7 +1220,7 @@ retry_send([{Type, Msg, Ts} | Msgs], Now, State = #state{retry_interval = Interv retry_send(publish, Msg = #mqtt_msg{qos = QoS, packet_id = PacketId}, Now, State = #state{inflight = Inflight}) -> - Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS1)}, + Msg1 = Msg#mqtt_msg{dup = (QoS =:= ?QOS_1)}, case send(Msg1, State) of {ok, NewState} -> Inflight1 = emqx_inflight:update(PacketId, {publish, Msg1, Now}, Inflight), diff --git a/src/emqx_client_sock.erl b/src/emqx_client_sock.erl index dc19a8d91..505454e2d 100644 --- a/src/emqx_client_sock.erl +++ b/src/emqx_client_sock.erl @@ -49,7 +49,10 @@ connect(Host, Port, SockOpts, Timeout) -> end. ssl_upgrade(Sock, SslOpts, Timeout) -> - case ssl:connect(Sock, SslOpts, Timeout) of + TlsVersions = proplists:get_value(versions, SslOpts, []), + Ciphers = proplists:get_value(ciphers, SslOpts, default_ciphers(TlsVersions)), + SslOpts2 = emqx_misc:merge_opts(SslOpts, [{ciphers, Ciphers}]), + case ssl:connect(Sock, SslOpts2, Timeout) of {ok, SslSock} -> ok = ssl:controlling_process(SslSock, self()), {ok, #ssl_socket{tcp = Sock, ssl = SslSock}}; @@ -91,3 +94,8 @@ sockname(Sock) when is_port(Sock) -> sockname(#ssl_socket{ssl = SslSock}) -> ssl:sockname(SslSock). +default_ciphers(TlsVersions) -> + lists:foldl( + fun(TlsVer, Ciphers) -> + Ciphers ++ ssl:cipher_suites(all, TlsVer) + end, [], TlsVersions). diff --git a/src/emqx_connection.erl b/src/emqx_connection.erl index ccb5f59fa..6c3d4c7e6 100644 --- a/src/emqx_connection.erl +++ b/src/emqx_connection.erl @@ -165,7 +165,8 @@ init_limiter({Rate, Burst}) -> esockd_rate_limit:new(Rate, Burst). send_fun(Transport, Socket, Peername) -> - fun(Data) -> + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), try Transport:async_send(Socket, Data) of ok -> ?LOG(debug, "SEND ~p", [iolist_to_binary(Data)], #state{peername = Peername}), @@ -408,4 +409,3 @@ maybe_gc(#state{}, {publish, _PacketId, #message{payload = Payload}}) -> ok = emqx_gc:inc(1, Oct); maybe_gc(_, _) -> ok. - diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index aa7aad064..075f0a11e 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -130,7 +130,7 @@ parse_packet(#mqtt_packet_header{type = ?CONNECT}, FrameBin, _Options) -> WillQoS : 2, WillFlag : 1, CleanStart : 1, - 0 : 1, + 0 : 1, KeepAlive : 16/big, Rest2/binary>> = Rest1, @@ -634,4 +634,3 @@ fixqos(?PUBREL, 0) -> 1; fixqos(?SUBSCRIBE, 0) -> 1; fixqos(?UNSUBSCRIBE, 0) -> 1; fixqos(_Type, QoS) -> QoS. - diff --git a/src/emqx_listeners.erl b/src/emqx_listeners.erl index d3ef43069..6987f03f9 100644 --- a/src/emqx_listeners.erl +++ b/src/emqx_listeners.erl @@ -35,7 +35,7 @@ start_listener({Proto, ListenOn, Options}) -> {ok, _} -> io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!", + io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p~n!", [Proto, format(ListenOn), Reason]) end. @@ -119,7 +119,7 @@ stop_listener({Proto, ListenOn, Opts}) -> ok -> io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]); {error, Reason} -> - io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.", + io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p~n.", [Proto, format(ListenOn), Reason]) end. diff --git a/src/emqx_local_bridge.erl b/src/emqx_local_bridge.erl index 228a64cff..7c4e7cea1 100644 --- a/src/emqx_local_bridge.erl +++ b/src/emqx_local_bridge.erl @@ -63,8 +63,7 @@ init([Pool, Id, Node, Topic, Options]) -> Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]), emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}), State = parse_opts(Options, #state{node = Node, subtopic = Topic}), - MQueue = emqx_mqueue:init(#{type => simple, - max_len => State#state.max_queue_len, + MQueue = emqx_mqueue:init(#{max_len => State#state.max_queue_len, store_qos0 => true}), {ok, State#state{pool = Pool, id = Id, mqueue = MQueue}}; false -> @@ -96,7 +95,8 @@ handle_cast(Msg, State) -> handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down}) -> %% TODO: how to drop??? - {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}}; + {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + {noreply, State#state{mqueue = NewQ}}; handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) -> emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]), diff --git a/src/emqx_logger.erl b/src/emqx_logger.erl index 2af5cd6b7..64bac9968 100644 --- a/src/emqx_logger.erl +++ b/src/emqx_logger.erl @@ -22,38 +22,47 @@ -export([error/1, error/2, error/3]). -export([critical/1, critical/2, critical/3]). +-export([add_proc_metadata/1]). + debug(Msg) -> - lager:debug(Msg). + logger:debug(Msg). debug(Format, Args) -> - lager:debug(Format, Args). -debug(Metadata, Format, Args) when is_list(Metadata) -> - lager:debug(Metadata, Format, Args). + logger:debug(Format, Args). +debug(Metadata, Format, Args) when is_map(Metadata) -> + logger:debug(Format, Args, Metadata). info(Msg) -> - lager:info(Msg). + logger:info(Msg). info(Format, Args) -> - lager:info(Format, Args). -info(Metadata, Format, Args) when is_list(Metadata) -> - lager:info(Metadata, Format, Args). + logger:info(Format, Args). +info(Metadata, Format, Args) when is_map(Metadata) -> + logger:info(Format, Args, Metadata). warning(Msg) -> - lager:warning(Msg). + logger:warning(Msg). warning(Format, Args) -> - lager:warning(Format, Args). -warning(Metadata, Format, Args) when is_list(Metadata) -> - lager:warning(Metadata, Format, Args). + logger:warning(Format, Args). +warning(Metadata, Format, Args) when is_map(Metadata) -> + logger:warning(Format, Args, Metadata). error(Msg) -> - lager:error(Msg). + logger:error(Msg). error(Format, Args) -> - lager:error(Format, Args). -error(Metadata, Format, Args) when is_list(Metadata) -> - lager:error(Metadata, Format, Args). + logger:error(Format, Args). +error(Metadata, Format, Args) when is_map(Metadata) -> + logger:error(Format, Args, Metadata). critical(Msg) -> - lager:critical(Msg). + logger:critical(Msg). critical(Format, Args) -> - lager:critical(Format, Args). -critical(Metadata, Format, Args) when is_list(Metadata) -> - lager:critical(Metadata, Format, Args). + logger:critical(Format, Args). +critical(Metadata, Format, Args) when is_map(Metadata) -> + logger:critical(Format, Args, Metadata). +add_proc_metadata(Meta) -> + case logger:get_process_metadata() of + undefined -> + logger:set_process_metadata(Meta); + OldMeta -> + logger:set_process_metadata(maps:merge(OldMeta, Meta)) + end. \ No newline at end of file diff --git a/src/emqx_logger_formatter.erl b/src/emqx_logger_formatter.erl new file mode 100644 index 000000000..3c5d9fc16 --- /dev/null +++ b/src/emqx_logger_formatter.erl @@ -0,0 +1,360 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% This file is copied from lib/kernel/src/logger_formatter.erl, and +%% modified for a more concise time format other than the default RFC3339. + +-module(emqx_logger_formatter). + +-export([format/2]). +-export([check_config/1]). + +-define(DEFAULT_FORMAT_TEMPLATE_SINGLE, [time," ",level,": ",msg,"\n"]). + +-define(FormatP, "~0tp"). + +-define(IS_STRING(String), + (is_list(String) orelse is_binary(String))). + +%%%----------------------------------------------------------------- +%%% Types +-type config() :: #{chars_limit => pos_integer() | unlimited, + depth => pos_integer() | unlimited, + max_size => pos_integer() | unlimited, + report_cb => logger:report_cb(), + quit => template()}. +-type template() :: [metakey() | {metakey(),template(),template()} | string()]. +-type metakey() :: atom() | [atom()]. + +%%%----------------------------------------------------------------- +%%% API +-spec format(LogEvent,Config) -> unicode:chardata() when + LogEvent :: logger:log_event(), + Config :: config(). +format(#{level:=Level,msg:=Msg0,meta:=Meta},Config0) + when is_map(Config0) -> + Config = add_default_config(Config0), + Template = maps:get(template,Config), + {BT,AT0} = lists:splitwith(fun(msg) -> false; (_) -> true end, Template), + {DoMsg,AT} = + case AT0 of + [msg|Rest] -> {true,Rest}; + _ ->{false,AT0} + end, + B = do_format(Level,Meta,BT,Config), + A = do_format(Level,Meta,AT,Config), + MsgStr = + if DoMsg -> + Config1 = + case maps:get(chars_limit,Config) of + unlimited -> + Config; + Size0 -> + Size = + case Size0 - string:length([B,A]) of + S when S>=0 -> S; + _ -> 0 + end, + Config#{chars_limit=>Size} + end, + string:trim(format_msg(Msg0,Meta,Config1)); + true -> + "" + end, + truncate([B,MsgStr,A],maps:get(max_size,Config)). + +do_format(Level,Data,[level|Format],Config) -> + [to_string(level,Level,Config)|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[{Key,IfExist,Else}|Format],Config) -> + String = + case value(Key,Data) of + {ok,Value} -> do_format(Level,Data#{Key=>Value},IfExist,Config); + error -> do_format(Level,Data,Else,Config) + end, + [String|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[Key|Format],Config) + when is_atom(Key) orelse + (is_list(Key) andalso is_atom(hd(Key))) -> + String = + case value(Key,Data) of + {ok,Value} -> to_string(Key,Value,Config); + error -> "" + end, + [String|do_format(Level,Data,Format,Config)]; +do_format(Level,Data,[Str|Format],Config) -> + [Str|do_format(Level,Data,Format,Config)]; +do_format(_Level,_Data,[],_Config) -> + []. + +value(Key,Meta) when is_map_key(Key,Meta) -> + {ok,maps:get(Key,Meta)}; +value([Key|Keys],Meta) when is_map_key(Key,Meta) -> + value(Keys,maps:get(Key,Meta)); +value([],Value) -> + {ok,Value}; +value(_,_) -> + error. + +to_string(time,Time,Config) -> + format_time(Time,Config); +to_string(mfa,MFA,Config) -> + format_mfa(MFA,Config); +to_string(_,Value,Config) -> + to_string(Value,Config). + +to_string(X,_) when is_atom(X) -> + atom_to_list(X); +to_string(X,_) when is_integer(X) -> + integer_to_list(X); +to_string(X,_) when is_pid(X) -> + pid_to_list(X); +to_string(X,_) when is_reference(X) -> + ref_to_list(X); +to_string(X,_) when is_list(X) -> + case printable_list(lists:flatten(X)) of + true -> X; + _ -> io_lib:format(?FormatP,[X]) + end; +to_string(X,_) -> + io_lib:format("~s",[X]). + +printable_list([]) -> + false; +printable_list(X) -> + io_lib:printable_list(X). + +format_msg({string,Chardata},Meta,Config) -> + format_msg({"~ts",[Chardata]},Meta,Config); +format_msg({report,_}=Msg,Meta,#{report_cb:=Fun}=Config) + when is_function(Fun,1); is_function(Fun,2) -> + format_msg(Msg,Meta#{report_cb=>Fun},maps:remove(report_cb,Config)); +format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,1) -> + try Fun(Report) of + {Format,Args} when is_list(Format), is_list(Args) -> + format_msg({Format,Args},maps:remove(report_cb,Meta),Config); + Other -> + format_msg({"REPORT_CB/1 ERROR: ~0tp; Returned: ~0tp", + [Report,Other]},Meta,Config) + catch C:R:S -> + format_msg({"REPORT_CB/1 CRASH: ~0tp; Reason: ~0tp", + [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]},Meta,Config) + end; +format_msg({report,Report},#{report_cb:=Fun}=Meta,Config) when is_function(Fun,2) -> + try Fun(Report,maps:with([depth,chars_limit,single_line],Config)) of + Chardata when ?IS_STRING(Chardata) -> + try chardata_to_list(Chardata) % already size limited by report_cb + catch _:_ -> + format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp", + [Report,Chardata]},Meta,Config) + end; + Other -> + format_msg({"REPORT_CB/2 ERROR: ~0tp; Returned: ~0tp", + [Report,Other]},Meta,Config) + catch C:R:S -> + format_msg({"REPORT_CB/2 CRASH: ~0tp; Reason: ~0tp", + [Report,{C,R,logger:filter_stacktrace(?MODULE,S)}]}, + Meta,Config) + end; +format_msg({report,Report},Meta,Config) -> + format_msg({report,Report}, + Meta#{report_cb=>fun logger:format_report/1}, + Config); +format_msg(Msg,_Meta,#{depth:=Depth,chars_limit:=CharsLimit}) -> + Opts = chars_limit_to_opts(CharsLimit), + do_format_msg(Msg, Depth, Opts). + +chars_limit_to_opts(unlimited) -> []; +chars_limit_to_opts(CharsLimit) -> [{chars_limit,CharsLimit}]. + +do_format_msg({Format0,Args},Depth,Opts) -> + try + Format1 = io_lib:scan_format(Format0, Args), + Format = reformat(Format1, Depth), + io_lib:build_text(Format,Opts) + catch C:R:S -> + FormatError = "FORMAT ERROR: ~0tp - ~0tp", + case Format0 of + FormatError -> + %% already been here - avoid failing cyclically + erlang:raise(C,R,S); + _ -> + format_msg({FormatError,[Format0,Args]},Depth,Opts) + end + end. + +reformat(Format,unlimited) -> + Format; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $p -> + [limit_depth(M#{width => 0}, Depth)|reformat(T, Depth)]; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $P -> + [M#{width => 0}|reformat(T, Depth)]; +reformat([#{control_char:=C}=M|T], Depth) when C =:= $p; C =:= $w -> + [limit_depth(M, Depth)|reformat(T, Depth)]; +reformat([H|T], Depth) -> + [H|reformat(T, Depth)]; +reformat([], _) -> + []. + +limit_depth(M0, unlimited) -> + M0; +limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) -> + C = C0 - ($a - $A), %To uppercase. + M0#{control_char:=C,args:=Args++[Depth]}. + +chardata_to_list(Chardata) -> + case unicode:characters_to_list(Chardata,unicode) of + List when is_list(List) -> + List; + Error -> + throw(Error) + end. + +truncate(String,unlimited) -> + String; +truncate(String,Size) -> + Length = string:length(String), + if Length>Size -> + case lists:reverse(lists:flatten(String)) of + [$\n|_] -> + string:slice(String,0,Size-4)++"...\n"; + _ -> + string:slice(String,0,Size-3)++"..." + end; + true -> + String + end. + +%% Convert microseconds-timestamp into local datatime string in milliseconds +format_time(SysTime,#{}) + when is_integer(SysTime) -> + Ms = SysTime rem 1000000 div 1000, + {Date, _Time = {H, Mi, S}} = calendar:system_time_to_local_time(SysTime, microsecond), + format_time({Date, {H, Mi, S, Ms}}). +format_time({{Y, M, D}, {H, Mi, S, Ms}}) -> + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b.~3..0b", [Y, M, D, H, Mi, S, Ms]); +format_time({{Y, M, D}, {H, Mi, S}}) -> + io_lib:format("~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b", [Y, M, D, H, Mi, S]). + +format_mfa({M,F,A},_) when is_atom(M), is_atom(F), is_integer(A) -> + atom_to_list(M)++":"++atom_to_list(F)++"/"++integer_to_list(A); +format_mfa({M,F,A},Config) when is_atom(M), is_atom(F), is_list(A) -> + format_mfa({M,F,length(A)},Config); +format_mfa(MFA,Config) -> + to_string(MFA,Config). + +%% Ensure that all valid configuration parameters exist in the final +%% configuration map +add_default_config(Config0) -> + Default = + #{chars_limit=>unlimited, + error_logger_notice_header=>info}, + MaxSize = get_max_size(maps:get(max_size,Config0,undefined)), + Depth = get_depth(maps:get(depth,Config0,undefined)), + add_default_template(maps:merge(Default,Config0#{max_size=>MaxSize, + depth=>Depth})). + +add_default_template(#{template:=_}=Config) -> + Config; +add_default_template(Config) -> + Config#{template=>?DEFAULT_FORMAT_TEMPLATE_SINGLE}. + +get_max_size(undefined) -> + unlimited; +get_max_size(S) -> + max(10,S). + +get_depth(undefined) -> + error_logger:get_format_depth(); +get_depth(S) -> + max(5,S). + +-spec check_config(Config) -> ok | {error,term()} when + Config :: config(). +check_config(Config) when is_map(Config) -> + do_check_config(maps:to_list(Config)); +check_config(Config) -> + {error,{invalid_formatter_config,?MODULE,Config}}. + +do_check_config([{Type,L}|Config]) when Type == chars_limit; + Type == depth; + Type == max_size -> + case check_limit(L) of + ok -> do_check_config(Config); + error -> {error,{invalid_formatter_config,?MODULE,{Type,L}}} + end; +do_check_config([{error_logger_notice_header,ELNH}|Config]) when ELNH == info; + ELNH == notice -> + do_check_config(Config); +do_check_config([{report_cb,RCB}|Config]) when is_function(RCB,1); + is_function(RCB,2) -> + do_check_config(Config); +do_check_config([{template,T}|Config]) -> + case check_template(T) of + ok -> do_check_config(Config); + error -> {error,{invalid_formatter_template,?MODULE,T}} + end; + +do_check_config([C|_]) -> + {error,{invalid_formatter_config,?MODULE,C}}; +do_check_config([]) -> + ok. + +check_limit(L) when is_integer(L), L>0 -> + ok; +check_limit(unlimited) -> + ok; +check_limit(_) -> + error. + +check_template([Key|T]) when is_atom(Key) -> + check_template(T); +check_template([Key|T]) when is_list(Key), is_atom(hd(Key)) -> + case lists:all(fun(X) when is_atom(X) -> true; + (_) -> false + end, + Key) of + true -> + check_template(T); + false -> + error + end; +check_template([{Key,IfExist,Else}|T]) + when is_atom(Key) orelse + (is_list(Key) andalso is_atom(hd(Key))) -> + case check_template(IfExist) of + ok -> + case check_template(Else) of + ok -> + check_template(T); + error -> + error + end; + error -> + error + end; +check_template([Str|T]) when is_list(Str) -> + case io_lib:printable_unicode_list(Str) of + true -> check_template(T); + false -> error + end; +check_template([]) -> + ok; +check_template(_) -> + error. diff --git a/src/emqx_message.erl b/src/emqx_message.erl index 91e5d4d59..085565403 100644 --- a/src/emqx_message.erl +++ b/src/emqx_message.erl @@ -34,7 +34,7 @@ make(Topic, Payload) -> -spec(make(atom() | emqx_types:client_id(), emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). make(From, Topic, Payload) -> - make(From, ?QOS0, Topic, Payload). + make(From, ?QOS_0, Topic, Payload). -spec(make(atom() | emqx_types:client_id(), emqx_mqtt_types:qos(), emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()). @@ -47,7 +47,7 @@ make(From, QoS, Topic, Payload) -> payload = Payload, timestamp = os:timestamp()}. -msgid(?QOS0) -> undefined; +msgid(?QOS_0) -> undefined; msgid(_QoS) -> emqx_guid:gen(). set_flags(Flags, Msg = #message{flags = undefined}) when is_map(Flags) -> @@ -114,7 +114,7 @@ elapsed(Since) -> max(0, timer:now_diff(os:timestamp(), Since) div 1000). format(#message{id = Id, qos = QoS, topic = Topic, from = From, flags = Flags, headers = Headers}) -> - io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~s, Flags=~s, Headers=~s)", + io_lib:format("Message(Id=~s, QoS=~w, Topic=~s, From=~p, Flags=~s, Headers=~s)", [Id, QoS, Topic, From, format(flags, Flags), format(headers, Headers)]). format(_, undefined) -> diff --git a/src/emqx_misc.erl b/src/emqx_misc.erl index 03c42510c..cf4a555ca 100644 --- a/src/emqx_misc.erl +++ b/src/emqx_misc.erl @@ -62,9 +62,11 @@ proc_stats(Pid) -> -define(DISABLED, 0). +init_proc_mng_policy(undefined) -> ok; init_proc_mng_policy(Zone) -> - #{max_heap_size := MaxHeapSizeInBytes} = ShutdownPolicy = - emqx_zone:get_env(Zone, force_shutdown_policy), + #{max_heap_size := MaxHeapSizeInBytes} + = ShutdownPolicy + = emqx_zone:get_env(Zone, force_shutdown_policy), MaxHeapSize = MaxHeapSizeInBytes div erlang:system_info(wordsize), _ = erlang:process_flag(max_heap_size, MaxHeapSize), % zero is discarded erlang:put(force_shutdown_policy, ShutdownPolicy), @@ -106,4 +108,3 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED. proc_info(Key) -> {Key, Value} = erlang:process_info(self(), Key), Value. - diff --git a/src/emqx_modules.erl b/src/emqx_modules.erl index 09e732f68..60c4b6351 100644 --- a/src/emqx_modules.erl +++ b/src/emqx_modules.erl @@ -21,7 +21,7 @@ load() -> lists:foreach( fun({Mod, Env}) -> ok = Mod:load(Env), - io:format("Load ~s module successfully.~n", [Mod]) + logger:info("Load ~s module successfully.", [Mod]) end, emqx_config:get_env(modules, [])). -spec(unload() -> ok). diff --git a/src/emqx_mqtt_types.erl b/src/emqx_mqtt_types.erl index 6beb17780..71451cca7 100644 --- a/src/emqx_mqtt_types.erl +++ b/src/emqx_mqtt_types.erl @@ -22,7 +22,7 @@ -export_type([topic_filters/0]). -export_type([packet_id/0, packet_type/0, packet/0]). --type(qos() :: ?QOS0 | ?QOS1 | ?QOS2). +-type(qos() :: ?QOS_0 | ?QOS_1 | ?QOS_2). -type(version() :: ?MQTT_PROTO_V3 | ?MQTT_PROTO_V4 | ?MQTT_PROTO_V5). -type(qos_name() :: qos0 | at_most_once | qos1 | at_least_once | @@ -40,4 +40,3 @@ }). -type(topic_filters() :: [{emqx_topic:topic(), subopts()}]). -type(packet() :: #mqtt_packet{}). - diff --git a/src/emqx_mqueue.erl b/src/emqx_mqueue.erl index d9270dd5f..56390d412 100644 --- a/src/emqx_mqueue.erl +++ b/src/emqx_mqueue.erl @@ -13,8 +13,8 @@ %% @doc A Simple in-memory message queue. %% -%% Notice that MQTT is not an enterprise messaging queue. MQTT assume that client -%% should be online in most of the time. +%% Notice that MQTT is not a (on-disk) persistent messaging queue. +%% It assumes that clients should be online in most of the time. %% %% This module implements a simple in-memory queue for MQTT persistent session. %% @@ -37,7 +37,8 @@ %% 3. QoS=0 messages are only enqueued when `store_qos0' is given `true` %% in init options %% -%% 4. If the queue is full drop the oldest one unless `max_len' is set to `0'. +%% 4. If the queue is full, drop the oldest one +%% unless `max_len' is set to `0' which implies (`infinity'). %% %% @end @@ -46,132 +47,121 @@ -include("emqx.hrl"). -include("emqx_mqtt.hrl"). --export([init/1, type/1]). +-export([init/1]). -export([is_empty/1]). -export([len/1, max_len/1]). -export([in/2, out/1]). -export([stats/1, dropped/1]). --define(PQUEUE, emqx_pqueue). +-export_type([mqueue/0, options/0]). --type(priority() :: {iolist(), pos_integer()}). - --type(options() :: #{type := simple | priority, - max_len := non_neg_integer(), - priorities => list(priority()), - store_qos0 => boolean()}). +-type(topic() :: emqx_topic:topic()). +-type(priority() :: infinity | integer()). +-type(pq() :: emqx_pqueue:q()). +-type(count() :: non_neg_integer()). +-type(p_table() :: ?NO_PRIORITY_TABLE | #{topic() := priority()}). +-type(options() :: #{max_len := count(), + priorities => p_table(), + default_priority => highest | lowest, + store_qos0 => boolean() + }). +-type(message() :: pemqx_types:message()). -type(stat() :: {len, non_neg_integer()} | {max_len, non_neg_integer()} | {dropped, non_neg_integer()}). +-define(PQUEUE, emqx_pqueue). +-define(LOWEST_PRIORITY, 0). +-define(HIGHEST_PRIORITY, infinity). +-define(MAX_LEN_INFINITY, 0). + -record(mqueue, { - type :: simple | priority, - q :: queue:queue() | ?PQUEUE:q(), - %% priority table - priorities = [], - pseq = 0, - len = 0, - max_len = 0, - qos0 = false, - dropped = 0 + store_qos0 = false :: boolean(), + max_len = ?MAX_LEN_INFINITY :: count(), + len = 0 :: count(), + dropped = 0 :: count(), + p_table = ?NO_PRIORITY_TABLE :: p_table(), + default_p = ?LOWEST_PRIORITY :: priority(), + q = ?PQUEUE:new() :: pq() }). --type(mqueue() :: #mqueue{}). - --export_type([mqueue/0, priority/0, options/0]). +-opaque(mqueue() :: #mqueue{}). -spec(init(options()) -> mqueue()). -init(Opts = #{type := Type, max_len := MaxLen, store_qos0 := QoS0}) -> - init_q(#mqueue{type = Type, len = 0, max_len = MaxLen, qos0 = QoS0}, Opts). +init(Opts = #{max_len := MaxLen0, store_qos0 := QoS_0}) -> + MaxLen = case (is_integer(MaxLen0) andalso MaxLen0 > ?MAX_LEN_INFINITY) of + true -> MaxLen0; + false -> ?MAX_LEN_INFINITY + end, + #mqueue{max_len = MaxLen, + store_qos0 = QoS_0, + p_table = get_opt(priorities, Opts, ?NO_PRIORITY_TABLE), + default_p = get_priority_opt(Opts) + }. -init_q(MQ = #mqueue{type = simple}, _Opts) -> - MQ#mqueue{q = queue:new()}; -init_q(MQ = #mqueue{type = priority}, #{priorities := Priorities}) -> - init_pq(Priorities, MQ#mqueue{q = ?PQUEUE:new()}). +is_empty(#mqueue{len = Len}) -> Len =:= 0. -init_pq([], MQ) -> - MQ; -init_pq([{Topic, P} | L], MQ) -> - {_, MQ1} = insert_p(iolist_to_binary(Topic), P, MQ), - init_pq(L, MQ1). - -insert_p(Topic, P, MQ = #mqueue{priorities = L, pseq = Seq}) -> - <> = <>, - {PInt, MQ#mqueue{priorities = [{Topic, PInt} | L], pseq = Seq + 1}}. - --spec(type(mqueue()) -> simple | priority). -type(#mqueue{type = Type}) -> Type. - -is_empty(#mqueue{type = simple, len = Len}) -> Len =:= 0; -is_empty(#mqueue{type = priority, q = Q}) -> ?PQUEUE:is_empty(Q). - -len(#mqueue{type = simple, len = Len}) -> Len; -len(#mqueue{type = priority, q = Q}) -> ?PQUEUE:len(Q). +len(#mqueue{len = Len}) -> Len. max_len(#mqueue{max_len = MaxLen}) -> MaxLen. -%% @doc Dropped of the mqueue --spec(dropped(mqueue()) -> non_neg_integer()). +%% @doc Return number of dropped messages. +-spec(dropped(mqueue()) -> count()). dropped(#mqueue{dropped = Dropped}) -> Dropped. %% @doc Stats of the mqueue -spec(stats(mqueue()) -> [stat()]). -stats(#mqueue{type = Type, q = Q, max_len = MaxLen, len = Len, dropped = Dropped}) -> - [{len, case Type of - simple -> Len; - priority -> ?PQUEUE:len(Q) - end} | [{max_len, MaxLen}, {dropped, Dropped}]]. +stats(#mqueue{max_len = MaxLen, dropped = Dropped} = MQ) -> + [{len, len(MQ)}, {max_len, MaxLen}, {dropped, Dropped}]. %% @doc Enqueue a message. --spec(in(emqx_types:message(), mqueue()) -> mqueue()). -in(#message{qos = ?QOS_0}, MQ = #mqueue{qos0 = false}) -> - MQ; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> - MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len, max_len = MaxLen, dropped = Dropped}) - when Len >= MaxLen -> - {{value, _Old}, Q2} = queue:out(Q), - MQ#mqueue{q = queue:in(Msg, Q2), dropped = Dropped +1}; -in(Msg, MQ = #mqueue{type = simple, q = Q, len = Len}) -> - MQ#mqueue{q = queue:in(Msg, Q), len = Len + 1}; - -in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, - priorities = Priorities, - max_len = 0}) -> - case lists:keysearch(Topic, 1, Priorities) of - {value, {_, Pri}} -> - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)}; +-spec(in(message(), mqueue()) -> {undefined | message(), mqueue()}). +in(#message{qos = ?QOS_0}, MQ = #mqueue{store_qos0 = false}) -> + {_Dropped = undefined, MQ}; +in(Msg = #message{topic = Topic}, MQ = #mqueue{default_p = Dp, + p_table = PTab, + q = Q, + len = Len, + max_len = MaxLen, + dropped = Dropped + } = MQ) -> + Priority = get_priority(Topic, PTab, Dp), + PLen = ?PQUEUE:plen(Priority, Q), + case MaxLen =/= ?MAX_LEN_INFINITY andalso PLen =:= MaxLen of + true -> + %% reached max length, drop the oldest message + {{value, DroppedMsg}, Q1} = ?PQUEUE:out(Priority, Q), + Q2 = ?PQUEUE:in(Msg, Priority, Q1), + {DroppedMsg, MQ#mqueue{q = Q2, dropped = Dropped + 1}}; false -> - {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} - end; -in(Msg = #message{topic = Topic}, MQ = #mqueue{type = priority, q = Q, - priorities = Priorities, - max_len = MaxLen}) -> - case lists:keysearch(Topic, 1, Priorities) of - {value, {_, Pri}} -> - case ?PQUEUE:plen(Pri, Q) >= MaxLen of - true -> - {_, Q1} = ?PQUEUE:out(Pri, Q), - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q1)}; - false -> - MQ#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} - end; - false -> - {Pri, MQ1} = insert_p(Topic, 0, MQ), - MQ1#mqueue{q = ?PQUEUE:in(Msg, Pri, Q)} + {_DroppedMsg = undefined, MQ#mqueue{len = Len + 1, q = ?PQUEUE:in(Msg, Priority, Q)}} end. -out(MQ = #mqueue{type = simple, len = 0}) -> +-spec(out(mqueue()) -> {empty | {value, message()}, mqueue()}). +out(MQ = #mqueue{len = 0, q = Q}) -> + 0 = ?PQUEUE:len(Q), %% assert, in this case, ?PQUEUE:len should be very cheap {empty, MQ}; -out(MQ = #mqueue{type = simple, q = Q, len = Len, max_len = 0}) -> - {R, Q2} = queue:out(Q), - {R, MQ#mqueue{q = Q2, len = Len - 1}}; -out(MQ = #mqueue{type = simple, q = Q, len = Len}) -> - {R, Q2} = queue:out(Q), - {R, MQ#mqueue{q = Q2, len = Len - 1}}; -out(MQ = #mqueue{type = priority, q = Q}) -> - {R, Q2} = ?PQUEUE:out(Q), - {R, MQ#mqueue{q = Q2}}. +out(MQ = #mqueue{q = Q, len = Len}) -> + {R, Q1} = ?PQUEUE:out(Q), + {R, MQ#mqueue{q = Q1, len = Len - 1}}. +get_opt(Key, Opts, Default) -> + case maps:get(Key, Opts, Default) of + undefined -> Default; + X -> X + end. + +get_priority_opt(Opts) -> + case get_opt(default_priority, Opts, ?LOWEST_PRIORITY) of + lowest -> ?LOWEST_PRIORITY; + highest -> ?HIGHEST_PRIORITY; + N when is_integer(N) -> N + end. + +%% MICRO-OPTIMIZATION: When there is no priority table defined (from config), +%% disregard default priority from config, always use lowest (?LOWEST_PRIORITY=0) +%% because the lowest priority in emqx_pqueue is a fallback to queue:queue() +%% while the highest 'infinity' is a [{infinity, queue:queue()}] +get_priority(_Topic, ?NO_PRIORITY_TABLE, _) -> ?LOWEST_PRIORITY; +get_priority(Topic, PTab, Dp) -> maps:get(Topic, PTab, Dp). diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 2040e595e..6f430c5f2 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -103,7 +103,7 @@ validate_properties(_, _) -> validate_subscription({Topic, #{qos := QoS}}) -> emqx_topic:validate(filter, Topic) andalso validate_qos(QoS). -validate_qos(QoS) when ?QOS0 =< QoS, QoS =< ?QOS2 -> +validate_qos(QoS) when ?QOS_0 =< QoS, QoS =< ?QOS_2 -> true; validate_qos(_) -> error(bad_qos). diff --git a/src/emqx_protocol.erl b/src/emqx_protocol.erl index 35ec7ec9a..bad919728 100644 --- a/src/emqx_protocol.erl +++ b/src/emqx_protocol.erl @@ -72,9 +72,8 @@ -define(NO_PROPS, undefined). --define(LOG(Level, Format, Args, PState), - emqx_logger:Level([{client, PState#pstate.client_id}], "MQTT(~s@~s): " ++ Format, - [PState#pstate.client_id, esockd_net:format(PState#pstate.peername) | Args])). +-define(LOG(Level, Format, Args, _PState), + emqx_logger:Level("[MQTT] " ++ Format, Args)). %%------------------------------------------------------------------------------ %% Init @@ -82,6 +81,7 @@ -spec(init(map(), list()) -> state()). init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options) -> + emqx_logger:add_proc_metadata(#{peername => esockd_net:format(Peername)}), Zone = proplists:get_value(zone, Options), #pstate{zone = Zone, sendfun = SendFun, @@ -287,7 +287,7 @@ process_packet(?CONNECT_PACKET( client_id = ClientId, username = Username, password = Password} = Connect), PState) -> - + emqx_logger:add_proc_metadata(#{client_id => ClientId}), %% TODO: Mountpoint... %% Msg -> emqx_mountpoint:mount(MountPoint, Msg) WillMsg = make_will_msg(Connect), @@ -586,13 +586,10 @@ deliver({disconnect, _ReasonCode}, PState) -> -spec(send(emqx_mqtt_types:packet(), state()) -> {ok, state()} | {error, term()}). send(Packet = ?PACKET(Type), PState = #pstate{proto_ver = Ver, sendfun = SendFun}) -> trace(send, Packet, PState), - case SendFun(emqx_frame:serialize(Packet, #{version => Ver})) of + case SendFun(Packet, #{version => Ver}) of ok -> emqx_metrics:sent(Packet), {ok, inc_stats(send, Type, PState)}; - {binary, _Data} -> - emqx_metrics:sent(Packet), - {ok, inc_stats(send, Type, PState)}; {error, Reason} -> {error, Reason} end. diff --git a/src/emqx_reason_codes.erl b/src/emqx_reason_codes.erl index 75118b563..bb0d8bb0b 100644 --- a/src/emqx_reason_codes.erl +++ b/src/emqx_reason_codes.erl @@ -138,5 +138,5 @@ compat(connack, 16#9C) -> ?CONNACK_SERVER; compat(connack, 16#9D) -> ?CONNACK_SERVER; compat(connack, 16#9F) -> ?CONNACK_SERVER; -compat(suback, Code) when Code =< ?QOS2 -> Code; +compat(suback, Code) when Code =< ?QOS_2 -> Code; compat(suback, Code) when Code >= 16#80 -> 16#80. diff --git a/src/emqx_session.erl b/src/emqx_session.erl index 286066a59..6cfef70d2 100644 --- a/src/emqx_session.erl +++ b/src/emqx_session.erl @@ -161,9 +161,8 @@ -define(TIMEOUT, 60000). --define(LOG(Level, Format, Args, State), - emqx_logger:Level([{client, State#state.client_id}], - "Session(~s): " ++ Format, [State#state.client_id | Args])). +-define(LOG(Level, Format, Args, _State), + emqx_logger:Level("[Session] " ++ Format, Args)). %% @doc Start a session proc. -spec(start_link(SessAttrs :: map()) -> {ok, pid()}). @@ -341,6 +340,7 @@ init([Parent, #{zone := Zone, max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum, will_msg := WillMsg}]) -> + emqx_logger:add_proc_metadata(#{client_id => ClientId}), process_flag(trap_exit, true), true = link(ConnPid), IdleTimout = get_env(Zone, idle_timeout, 30000), @@ -377,10 +377,10 @@ init([Parent, #{zone := Zone, gen_server:enter_loop(?MODULE, [{hibernate_after, IdleTimout}], State). init_mqueue(Zone) -> - emqx_mqueue:init(#{type => get_env(Zone, mqueue_type, simple), - max_len => get_env(Zone, max_mqueue_len, 1000), - priorities => get_env(Zone, mqueue_priorities, ""), - store_qos0 => get_env(Zone, mqueue_store_qos0, true) + emqx_mqueue:init(#{max_len => get_env(Zone, max_mqueue_len, 1000), + store_qos0 => get_env(Zone, mqueue_store_qos0, true), + priorities => get_env(Zone, mqueue_priorities), + default_priority => get_env(Zone, mqueue_default_priority) }). binding(ConnPid) -> @@ -517,7 +517,7 @@ handle_cast({pubcomp, PacketId, _ReasonCode}, State = #state{inflight = Inflight handle_cast({resume, #{conn_pid := ConnPid, will_msg := WillMsg, expiry_interval := SessionExpiryInterval, - max_inflight := MaxInflight, + max_inflight := MaxInflight, topic_alias_maximum := TopicAliasMaximum}}, State = #state{client_id = ClientId, conn_pid = OldConnPid, clean_start = CleanStart, @@ -547,7 +547,7 @@ handle_cast({resume, #{conn_pid := ConnPid, await_rel_timer = undefined, expiry_timer = undefined, expiry_interval = SessionExpiryInterval, - inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), + inflight = emqx_inflight:update_size(MaxInflight, State#state.inflight), topic_alias_maximum = TopicAliasMaximum, will_delay_timer = undefined, will_msg = WillMsg}, @@ -574,10 +574,10 @@ handle_info({dispatch, Topic, Msgs}, State) when is_list(Msgs) -> end, State, Msgs)}; %% Dispatch message -handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, +handle_info({dispatch, Topic, Msg = #message{headers = Headers}}, State = #state{subscriptions = SubMap, topic_alias_maximum = TopicAliasMaximum}) when is_record(Msg, message) -> TopicAlias = maps:get('Topic-Alias', Headers, undefined), - if + if TopicAlias =:= undefined orelse TopicAlias =< TopicAliasMaximum -> noreply(case maps:find(Topic, SubMap) of {ok, #{nl := Nl, qos := QoS, rap := Rap, subid := SubId}} -> @@ -802,13 +802,13 @@ dispatch(Msg, State = #state{client_id = ClientId, conn_pid = undefined}) -> end; %% Deliver qos0 message directly to client -dispatch(Msg = #message{qos = ?QOS0} = Msg, State) -> +dispatch(Msg = #message{qos = ?QOS_0} = Msg, State) -> deliver(undefined, Msg, State), inc_stats(deliver, Msg, State); dispatch(Msg = #message{qos = QoS} = Msg, State = #state{next_pkt_id = PacketId, inflight = Inflight}) - when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> + when QoS =:= ?QOS_1 orelse QoS =:= ?QOS_2 -> case emqx_inflight:is_full(Inflight) of true -> enqueue_msg(Msg, State); false -> @@ -817,14 +817,15 @@ dispatch(Msg = #message{qos = QoS} = Msg, end. enqueue_msg(Msg, State = #state{mqueue = Q}) -> - inc_stats(enqueue, Msg, State#state{mqueue = emqx_mqueue:in(Msg, Q)}). + {_Dropped, NewQ} = emqx_mqueue:in(Msg, Q), + inc_stats(enqueue, Msg, State#state{mqueue = NewQ}). %%------------------------------------------------------------------------------ %% Deliver %%------------------------------------------------------------------------------ redeliver({PacketId, Msg = #message{qos = QoS}}, State) -> - deliver(PacketId, if QoS =:= ?QOS2 -> Msg; + deliver(PacketId, if QoS =:= ?QOS_2 -> Msg; true -> emqx_message:set_flag(dup, Msg) end, State); @@ -973,4 +974,3 @@ noreply(State) -> shutdown(Reason, State) -> {stop, {shutdown, Reason}, State}. - diff --git a/src/emqx_sm.erl b/src/emqx_sm.erl index bc3f6ff68..55d0e26a7 100644 --- a/src/emqx_sm.erl +++ b/src/emqx_sm.erl @@ -60,9 +60,7 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid emqx_sm_locker:trans(ClientId, CleanStart); open_session(SessAttrs = #{clean_start := false, - client_id := ClientId, - max_inflight := MaxInflight, - topic_alias_maximum := TopicAliasMaximum}) -> + client_id := ClientId}) -> ResumeStart = fun(_) -> case resume_session(ClientId, SessAttrs) of {ok, SPid} -> diff --git a/src/emqx_sys_mon.erl b/src/emqx_sys_mon.erl index b42d96aa4..e3bd4a2a9 100644 --- a/src/emqx_sys_mon.erl +++ b/src/emqx_sys_mon.erl @@ -25,9 +25,9 @@ -define(SYSMON, ?MODULE). -define(LOG(Msg, ProcInfo), - emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). + emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p", [WarnMsg, ProcInfo])). -define(LOG(Msg, ProcInfo, PortInfo), - emqx_logger:warning([{sysmon, true}], "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). + emqx_logger:warning(#{sysmon => true}, "[SYSMON] ~s~n~p~n~p", [WarnMsg, ProcInfo, PortInfo])). %% @doc Start system monitor -spec(start_link(Opts :: list(tuple())) -> {ok, pid()} | ignore | {error, term()}). @@ -130,7 +130,7 @@ handle_info({timeout, _Ref, reset}, State) -> {noreply, State#state{events = []}, hibernate}; handle_info(Info, State) -> - lager:error("[SYSMON] unexpected Info: ~p", [Info]), + logger:error("[SYSMON] unexpected Info: ~p", [Info]), {noreply, State}. terminate(_Reason, #state{timer = TRef}) -> diff --git a/src/emqx_tracer.erl b/src/emqx_tracer.erl index 44ad6c26f..3d060dee9 100644 --- a/src/emqx_tracer.erl +++ b/src/emqx_tracer.erl @@ -25,12 +25,20 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {level, traces}). +-record(state, {level, org_top_level, traces}). --type(trace_who() :: {client | topic, binary()}). +-type(trace_who() :: {client_id | topic, binary()}). -define(TRACER, ?MODULE). --define(OPTIONS, [{formatter_config, [time, " [",severity,"] ", message, "\n"]}]). +-define(FORMAT, {emqx_logger_formatter, + #{template => + [time," [",level,"] ", + {client_id, + [{peername, + [client_id,"@",peername," "], + [client_id, " "]}], + []}, + msg,"\n"]}}). -spec(start_link() -> {ok, pid()} | ignore | {error, term()}). start_link() -> @@ -40,26 +48,26 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) -> %% Dont' trace '$SYS' publish ignore; trace(publish, #message{from = From, topic = Topic, payload = Payload}) - when is_binary(From); is_atom(From) -> - emqx_logger:info([{client, From}, {topic, Topic}], "~s PUBLISH to ~s: ~p", [From, Topic, Payload]). + when is_binary(From); is_atom(From) -> + emqx_logger:info(#{topic => Topic}, "PUBLISH to ~s: ~p", [Topic, Payload]). %%------------------------------------------------------------------------------ %% Start/Stop trace %%------------------------------------------------------------------------------ -%% @doc Start to trace client or topic. +%% @doc Start to trace client_id or topic. -spec(start_trace(trace_who(), string()) -> ok | {error, term()}). -start_trace({client, ClientId}, LogFile) -> - start_trace({start_trace, {client, ClientId}, LogFile}); +start_trace({client_id, ClientId}, LogFile) -> + start_trace({start_trace, {client_id, ClientId}, LogFile}); start_trace({topic, Topic}, LogFile) -> start_trace({start_trace, {topic, Topic}, LogFile}). start_trace(Req) -> gen_server:call(?MODULE, Req, infinity). -%% @doc Stop tracing client or topic. +%% @doc Stop tracing client_id or topic. -spec(stop_trace(trace_who()) -> ok | {error, term()}). -stop_trace({client, ClientId}) -> - gen_server:call(?MODULE, {stop_trace, {client, ClientId}}); +stop_trace({client_id, ClientId}) -> + gen_server:call(?MODULE, {stop_trace, {client_id, ClientId}}); stop_trace({topic, Topic}) -> gen_server:call(?MODULE, {stop_trace, {topic, Topic}}). @@ -73,37 +81,45 @@ lookup_traces() -> %%------------------------------------------------------------------------------ init([]) -> - {ok, #state{level = emqx_config:get_env(trace_level, debug), traces = #{}}}. + {ok, #state{level = emqx_config:get_env(trace_level, debug), + org_top_level = get_top_level(), + traces = #{}}}. handle_call({start_trace, Who, LogFile}, _From, State = #state{level = Level, traces = Traces}) -> - case catch lager:trace_file(LogFile, [Who], Level, ?OPTIONS) of - {ok, exists} -> - {reply, {error, already_exists}, State}; - {ok, Trace} -> - {reply, ok, State#state{traces = maps:put(Who, {Trace, LogFile}, Traces)}}; + case logger:add_handler(handler_id(Who), logger_disk_log_h, + #{level => Level, + formatter => ?FORMAT, + filesync_repeat_interval => 1000, + config => #{type => halt, file => LogFile}, + filter_default => stop, + filters => [{meta_key_filter, + {fun filter_by_meta_key/2, Who} }]}) of + ok -> + set_top_level(all), % open the top logger level to 'all' + emqx_logger:info("[Tracer] start trace for ~p", [Who]), + {reply, ok, State#state{traces = maps:put(Who, LogFile, Traces)}}; {error, Reason} -> - emqx_logger:error("[Tracer] trace error: ~p", [Reason]), - {reply, {error, Reason}, State}; - {'EXIT', Error} -> - emqx_logger:error("[Tracer] trace exit: ~p", [Error]), - {reply, {error, Error}, State} + emqx_logger:error("[Tracer] start trace for ~p failed, error: ~p", [Who, Reason]), + {reply, {error, Reason}, State} end; -handle_call({stop_trace, Who}, _From, State = #state{traces = Traces}) -> +handle_call({stop_trace, Who}, _From, State = #state{org_top_level = OrgTopLevel, traces = Traces}) -> case maps:find(Who, Traces) of - {ok, {Trace, _LogFile}} -> - case lager:stop_trace(Trace) of - ok -> ok; - {error, Error} -> - emqx_logger:error("[Tracer] stop trace ~p error: ~p", [Who, Error]) + {ok, _LogFile} -> + case logger:remove_handler(handler_id(Who)) of + ok -> + emqx_logger:info("[Tracer] stop trace for ~p", [Who]); + {error, Reason} -> + emqx_logger:error("[Tracer] stop trace for ~p failed, error: ~p", [Who, Reason]) end, + set_top_level(OrgTopLevel), % reset the top logger level to original value {reply, ok, State#state{traces = maps:remove(Who, Traces)}}; error -> {reply, {error, not_found}, State} end; handle_call(lookup_traces, _From, State = #state{traces = Traces}) -> - {reply, [{Who, LogFile} || {Who, {_Trace, LogFile}} <- maps:to_list(Traces)], State}; + {reply, [{Who, LogFile} || {Who, LogFile} <- maps:to_list(Traces)], State}; handle_call(Req, _From, State) -> emqx_logger:error("[Tracer] unexpected call: ~p", [Req]), @@ -123,3 +139,25 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. +handler_id({topic, Topic}) -> + list_to_atom("topic_" ++ binary_to_list(Topic)); +handler_id({client_id, ClientId}) -> + list_to_atom("clientid_" ++ binary_to_list(ClientId)). + +get_top_level() -> + #{level := OrgTopLevel} = logger:get_primary_config(), + OrgTopLevel. + +set_top_level(Level) -> + logger:set_primary_config(level, Level). + +filter_by_meta_key(#{meta:=Meta}=LogEvent, {MetaKey, MetaValue}) -> + case maps:find(MetaKey, Meta) of + {ok, MetaValue} -> LogEvent; + {ok, Topic} when MetaKey =:= topic -> + case emqx_topic:match(Topic, MetaValue) of + true -> LogEvent; + false -> ignore + end; + _ -> ignore + end. \ No newline at end of file diff --git a/src/emqx_ws_connection.erl b/src/emqx_ws_connection.erl index fa08fa1bb..6a8b71094 100644 --- a/src/emqx_ws_connection.erl +++ b/src/emqx_ws_connection.erl @@ -45,9 +45,8 @@ -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]). --define(WSLOG(Level, Format, Args, State), - emqx_logger:Level("MQTT/WS(~s): " ++ Format, - [esockd_net:format(State#state.peername) | Args])). +-define(WSLOG(Level, Format, Args, _State), + emqx_logger:Level("[MQTT/WS] " ++ Format, Args)). %%------------------------------------------------------------------------------ %% API @@ -144,12 +143,14 @@ websocket_init(#state{request = Req, options = Options}) -> idle_timeout = IdleTimout}}. send_fun(WsPid) -> - fun(Data) -> + fun(Packet, Options) -> + Data = emqx_frame:serialize(Packet, Options), BinSize = iolist_size(Data), emqx_metrics:inc('bytes/sent', BinSize), put(send_oct, get(send_oct) + BinSize), put(send_cnt, get(send_cnt) + 1), - WsPid ! {binary, iolist_to_binary(Data)} + WsPid ! {binary, iolist_to_binary(Data)}, + ok end. stat_fun() -> @@ -299,4 +300,3 @@ stop(Error, State) -> wsock_stats() -> [{Key, get(Key)} || Key <- ?SOCK_STATS]. - diff --git a/test/emqx_SUITE_data/slave.config b/test/emqx_SUITE_data/slave.config index dd3a5f32b..1cdf851b5 100644 --- a/test/emqx_SUITE_data/slave.config +++ b/test/emqx_SUITE_data/slave.config @@ -33,21 +33,6 @@ {large_heap,8388608}, {busy_port,false}, {busy_dist_port,true}]}]}, - {sasl,[{sasl_error_logger,false}]}, - {lager, - [{error_logger_hwm,1000}, - {error_logger_redirect,true}, - {log_dir,"{{ platform_log_dir }}"}, - {handlers, - [{lager_console_backend,error}, - {lager_file_backend, - [{file,"{{ platform_log_dir }}/error.log"}, - {level,error}, - {size,10485760}, - {date,"$D0"}, - {count,5}]}, - {lager_syslog_backend,["emq",local0,error]}]}, - {crash_log,"{{ platform_log_dir }}/crash.log"}]}, {gen_rpc, [{socket_keepalive_count,2}, {socket_keepalive_interval,5}, diff --git a/test/emqx_client_SUITE.erl b/test/emqx_client_SUITE.erl index 2ad3dbaf7..c7b3455ad 100644 --- a/test/emqx_client_SUITE.erl +++ b/test/emqx_client_SUITE.erl @@ -59,20 +59,26 @@ end_per_suite(_Config) -> emqx_ct_broker_helpers:run_teardown_steps(). request_response_exception(QoS) -> - {ok, Client, _} = emqx_client:start_link([{proto_ver, v5}, - {properties, #{ 'Request-Response-Information' => 0 }}]), + {ok, Client} = emqx_client:start_link([{proto_ver, v5}, + {properties, #{ 'Request-Response-Information' => 0 }}]), + {ok, _} = emqx_client:connect(Client), ?assertError(no_response_information, emqx_client:sub_request_topic(Client, QoS, <<"request_topic">>)), ok = emqx_client:disconnect(Client). request_response_per_qos(QoS) -> - {ok, Requester, _} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"requester">>}, - {properties, #{ 'Request-Response-Information' => 1}}]), - {ok, Responser, _} = emqx_client:start_link([{proto_ver, v5}, - {client_id, <<"responser">>}, - {properties, #{ 'Request-Response-Information' => 1}}, - {request_handler, fun(_Req) -> <<"ResponseTest">> end}]), + {ok, Requester} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"requester">>}, + {properties, #{ 'Request-Response-Information' => 1}}]), + {ok, _} = emqx_client:connect(Requester), + {ok, Responser} = emqx_client:start_link([{proto_ver, v5}, + {client_id, <<"responser">>}, + {properties, #{ + 'Request-Response-Information' => 1}}, + {request_handler, + fun(_Req) -> <<"ResponseTest">> end} + ]), + {ok, _} = emqx_client:connect(Responser), ok = emqx_client:sub_request_topic(Responser, QoS, <<"request_topic">>), {ok, <<"ResponseTest">>} = emqx_client:request(Requester, <<"response_topic">>, <<"request_topic">>, <<"request_payload">>, QoS), ok = emqx_client:set_request_handler(Responser, fun(<<"request_payload">>) -> @@ -108,9 +114,15 @@ share_sub_request_topic_per_qos(QoS) -> {client_id, atom_to_binary(ClientId, utf8)}, {properties, Properties} ] end, - {ok, Requester, _} = emqx_client:start_link(Opts(requester)), - {ok, Responser1, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), - {ok, Responser2, _} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), + {ok, Requester} = emqx_client:start_link(Opts(requester)), + {ok, _} = emqx_client:connect(Requester), + + {ok, Responser1} = emqx_client:start_link([{request_handler, fun(Req) -> <<"1-", Req/binary>> end} | Opts(requester1)]), + {ok, _} = emqx_client:connect(Responser1), + + {ok, Responser2} = emqx_client:start_link([{request_handler, fun(Req) -> <<"2-", Req/binary>> end} | Opts(requester2)]), + {ok, _} = emqx_client:connect(Responser2), + ok = emqx_client:sub_request_topic(Responser1, QoS, ReqTopic, Group), ok = emqx_client:sub_request_topic(Responser2, QoS, ReqTopic, Group), %% Send a request, wait for response, validate response then return responser ID @@ -148,7 +160,9 @@ receive_messages(Count, Msgs) -> basic_test(_Config) -> Topic = nth(1, ?TOPICS), ct:print("Basic test starting"), - {ok, C, _} = emqx_client:start_link(), + {ok, C} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C), + {ok, _, [2]} = emqx_client:subscribe(C, Topic, qos2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), {ok, _} = emqx_client:publish(C, Topic, <<"qos 2">>, 2), @@ -157,11 +171,15 @@ basic_test(_Config) -> ok = emqx_client:disconnect(C). will_message_test(_Config) -> - {ok, C1, _} = emqx_client:start_link([{clean_start, true}, + {ok, C1} = emqx_client:start_link([{clean_start, true}, {will_topic, nth(3, ?TOPICS)}, {will_payload, <<"client disconnected">>}, {keepalive, 2}]), - {ok, C2, _} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C1), + + {ok, C2} = emqx_client:start_link(), + {ok, _} = emqx_client:connect(C2), + {ok, _, [2]} = emqx_client:subscribe(C2, nth(3, ?TOPICS), 2), timer:sleep(10), ok = emqx_client:stop(C1), @@ -171,26 +189,33 @@ will_message_test(_Config) -> ct:print("Will message test succeeded"). offline_message_queueing_test(_) -> - {ok, C1, _} = emqx_client:start_link([{clean_start, false}, - {client_id, <<"c1">>}]), + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C1), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(6, ?WILD_TOPICS), 2), ok = emqx_client:disconnect(C1), - {ok, C2, _} = emqx_client:start_link([{clean_start, true}, - {client_id, <<"c2">>}]), + {ok, C2} = emqx_client:start_link([{clean_start, true}, + {client_id, <<"c2">>}]), + {ok, _} = emqx_client:connect(C2), ok = emqx_client:publish(C2, nth(2, ?TOPICS), <<"qos 0">>, 0), {ok, _} = emqx_client:publish(C2, nth(3, ?TOPICS), <<"qos 1">>, 1), {ok, _} = emqx_client:publish(C2, nth(4, ?TOPICS), <<"qos 2">>, 2), timer:sleep(10), emqx_client:disconnect(C2), - {ok, C3, _} = emqx_client:start_link([{clean_start, false}, + {ok, C3} = emqx_client:start_link([{clean_start, false}, {client_id, <<"c1">>}]), + {ok, _} = emqx_client:connect(C3), + timer:sleep(10), emqx_client:disconnect(C3), ?assertEqual(3, length(receive_messages(3))). overlapping_subscriptions_test(_) -> - {ok, C, _} = emqx_client:start_link([]), + {ok, C} = emqx_client:start_link([]), + {ok, _} = emqx_client:connect(C), + {ok, _, [2, 1]} = emqx_client:subscribe(C, [{nth(7, ?WILD_TOPICS), 2}, {nth(1, ?WILD_TOPICS), 1}]), timer:sleep(10), @@ -228,8 +253,10 @@ overlapping_subscriptions_test(_) -> redelivery_on_reconnect_test(_) -> ct:print("Redelivery on reconnect test starting"), - {ok, C1, _} = emqx_client:start_link([{clean_start, false}, - {client_id, <<"c">>}]), + {ok, C1} = emqx_client:start_link([{clean_start, false}, + {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C1), + {ok, _, [2]} = emqx_client:subscribe(C1, nth(7, ?WILD_TOPICS), 2), timer:sleep(10), ok = emqx_client:pause(C1), @@ -240,8 +267,10 @@ redelivery_on_reconnect_test(_) -> timer:sleep(10), ok = emqx_client:disconnect(C1), ?assertEqual(0, length(receive_messages(2))), - {ok, C2, _} = emqx_client:start_link([{clean_start, false}, + {ok, C2} = emqx_client:start_link([{clean_start, false}, {client_id, <<"c">>}]), + {ok, _} = emqx_client:connect(C2), + timer:sleep(10), ok = emqx_client:disconnect(C2), ?assertEqual(2, length(receive_messages(2))). @@ -255,8 +284,10 @@ redelivery_on_reconnect_test(_) -> dollar_topics_test(_) -> ct:print("$ topics test starting"), - {ok, C, _} = emqx_client:start_link([{clean_start, true}, - {keepalive, 0}]), + {ok, C} = emqx_client:start_link([{clean_start, true}, + {keepalive, 0}]), + {ok, _} = emqx_client:connect(C), + {ok, _, [1]} = emqx_client:subscribe(C, nth(6, ?WILD_TOPICS), 1), {ok, _} = emqx_client:publish(C, << <<"$">>/binary, (nth(2, ?TOPICS))/binary>>, <<"test">>, [{qos, 1}, {retain, false}]), diff --git a/test/emqx_frame_SUITE.erl b/test/emqx_frame_SUITE.erl index 1127f60d9..a4e64c7d0 100644 --- a/test/emqx_frame_SUITE.erl +++ b/test/emqx_frame_SUITE.erl @@ -100,7 +100,7 @@ serialize_parse_connect(_) -> ?assertEqual({ok, Packet1, <<>>}, parse_serialize(Packet1)), Packet2 = ?CONNECT_PACKET(#mqtt_packet_connect{ client_id = <<"clientId">>, - will_qos = ?QOS1, + will_qos = ?QOS_1, will_flag = true, will_retain = true, will_topic = <<"will">>, @@ -427,4 +427,3 @@ parse(Bin, Opts) when is_map(Opts) -> payload() -> iolist_to_binary(["payload." || _I <- lists:seq(1, 1000)]). - diff --git a/test/emqx_message_SUITE.erl b/test/emqx_message_SUITE.erl index 37207e40c..3bb06d14c 100644 --- a/test/emqx_message_SUITE.erl +++ b/test/emqx_message_SUITE.erl @@ -35,7 +35,7 @@ all() -> message_make(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), ?assertEqual(0, Msg#message.qos), - Msg1 = emqx_message:make(<<"clientid">>, ?QOS2, <<"topic">>, <<"payload">>), + Msg1 = emqx_message:make(<<"clientid">>, ?QOS_2, <<"topic">>, <<"payload">>), ?assert(is_binary(Msg1#message.id)), ?assertEqual(2, Msg1#message.qos). diff --git a/test/emqx_mountpoint_SUITE.erl b/test/emqx_mountpoint_SUITE.erl index 61d8d3652..a77baf751 100644 --- a/test/emqx_mountpoint_SUITE.erl +++ b/test/emqx_mountpoint_SUITE.erl @@ -28,8 +28,8 @@ t_mount_unmount(_) -> Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), Msg2 = emqx_mountpoint:mount(<<"mount">>, Msg), ?assertEqual(<<"mounttopic">>, Msg2#message.topic), - TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS2}}], - TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS2}}]), + TopicFilter = [{<<"mounttopic">>, #{qos => ?QOS_2}}], + TopicFilter = emqx_mountpoint:mount(<<"mount">>, [{<<"topic">>, #{qos => ?QOS_2}}]), Msg = emqx_mountpoint:unmount(<<"mount">>, Msg2). t_replvar(_) -> diff --git a/test/emqx_mqueue_SUITE.erl b/test/emqx_mqueue_SUITE.erl index 8a1ca5201..9bb424fa1 100644 --- a/test/emqx_mqueue_SUITE.erl +++ b/test/emqx_mqueue_SUITE.erl @@ -28,57 +28,58 @@ all() -> [t_in, t_in_qos0, t_out, t_simple_mqueue, t_infinity_simple_mqueue, t_priority_mqueue, t_infinity_priority_mqueue]. t_in(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Opts = #{max_len => 5, store_qos0 => true}, Q = ?Q:init(Opts), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{}, Q), + {_, Q1} = ?Q:in(#message{}, Q), ?assertEqual(1, ?Q:len(Q1)), - Q2 = ?Q:in(#message{qos = 1}, Q1), + {_, Q2} = ?Q:in(#message{qos = 1}, Q1), ?assertEqual(2, ?Q:len(Q2)), - Q3 = ?Q:in(#message{qos = 2}, Q2), - Q4 = ?Q:in(#message{}, Q3), - Q5 = ?Q:in(#message{}, Q4), + {_, Q3} = ?Q:in(#message{qos = 2}, Q2), + {_, Q4} = ?Q:in(#message{}, Q3), + {_, Q5} = ?Q:in(#message{}, Q4), ?assertEqual(5, ?Q:len(Q5)). t_in_qos0(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => false}, + Opts = #{max_len => 5, store_qos0 => false}, Q = ?Q:init(Opts), - Q1 = ?Q:in(#message{qos = 0}, Q), + {_, Q1} = ?Q:in(#message{qos = 0}, Q), ?assert(?Q:is_empty(Q1)), - Q2 = ?Q:in(#message{qos = 0}, Q1), + {_, Q2} = ?Q:in(#message{qos = 0}, Q1), ?assert(?Q:is_empty(Q2)). t_out(_) -> - Opts = #{type => simple, max_len => 5, store_qos0 => true}, + Opts = #{max_len => 5, store_qos0 => true}, Q = ?Q:init(Opts), {empty, Q} = ?Q:out(Q), - Q1 = ?Q:in(#message{}, Q), + {_, Q1} = ?Q:in(#message{}, Q), {Value, Q2} = ?Q:out(Q1), ?assertEqual(0, ?Q:len(Q2)), ?assertEqual({value, #message{}}, Value). t_simple_mqueue(_) -> - Opts = #{type => simple, max_len => 3, store_qos0 => false}, + Opts = #{max_len => 3, store_qos0 => false}, Q = ?Q:init(Opts), - ?assertEqual(simple, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), - Q2 = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), - Q3 = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), - Q4 = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), + {_, Q1} = ?Q:in(#message{qos = 1, payload = <<"1">>}, Q), + {_, Q2} = ?Q:in(#message{qos = 1, payload = <<"2">>}, Q1), + {_, Q3} = ?Q:in(#message{qos = 1, payload = <<"3">>}, Q2), + {_, Q4} = ?Q:in(#message{qos = 1, payload = <<"4">>}, Q3), ?assertEqual(3, ?Q:len(Q4)), {{value, Msg}, Q5} = ?Q:out(Q4), ?assertEqual(<<"2">>, Msg#message.payload), ?assertEqual([{len, 2}, {max_len, 3}, {dropped, 1}], ?Q:stats(Q5)). t_infinity_simple_mqueue(_) -> - Opts = #{type => simple, max_len => 0, store_qos0 => false}, + Opts = #{max_len => 0, store_qos0 => false}, Q = ?Q:init(Opts), ?assert(?Q:is_empty(Q)), ?assertEqual(0, ?Q:max_len(Q)), - Qx = lists:foldl(fun(I, AccQ) -> - ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ) + Qx = lists:foldl( + fun(I, AccQ) -> + {_, NewQ} = ?Q:in(#message{qos = 1, payload = iolist_to_binary([I])}, AccQ), + NewQ end, Q, lists:seq(1, 255)), ?assertEqual(255, ?Q:len(Qx)), ?assertEqual([{len, 255}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)), @@ -86,45 +87,55 @@ t_infinity_simple_mqueue(_) -> ?assertEqual(<<1>>, V#message.payload). t_priority_mqueue(_) -> - Opts = #{type => priority, max_len => 3, priorities => [{<<"t1">>, 1}, {<<"t2">>, 2}, {<<"t3">>, 3}], store_qos0 => false}, + Opts = #{max_len => 3, + priorities => + #{<<"t1">> => 1, + <<"t2">> => 2, + <<"t3">> => 3 + }, + store_qos0 => false}, Q = ?Q:init(Opts), - ?assertEqual(priority, ?Q:type(Q)), ?assertEqual(3, ?Q:max_len(Q)), ?assert(?Q:is_empty(Q)), - Q1 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q), - Q2 = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1), - Q3 = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2), + {_, Q1} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q), + {_, Q2} = ?Q:in(#message{qos = 1, topic = <<"t1">>}, Q1), + {_, Q3} = ?Q:in(#message{qos = 1, topic = <<"t3">>}, Q2), ?assertEqual(3, ?Q:len(Q3)), - Q4 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3), + {_, Q4} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q3), ?assertEqual(4, ?Q:len(Q4)), - Q5 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4), + {_, Q5} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q4), ?assertEqual(5, ?Q:len(Q5)), - Q6 = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), + {_, Q6} = ?Q:in(#message{qos = 1, topic = <<"t2">>}, Q5), ?assertEqual(5, ?Q:len(Q6)), {{value, Msg}, Q7} = ?Q:out(Q6), ?assertEqual(4, ?Q:len(Q7)), ?assertEqual(<<"t3">>, Msg#message.topic). t_infinity_priority_mqueue(_) -> - Opts = #{type => priority, max_len => 0, priorities => [{<<"t">>, 1}, {<<"t1">>, 2}], store_qos0 => false}, + Opts = #{max_len => 0, + priorities => + #{<<"t">> => 1, + <<"t1">> => 2 + }, + store_qos0 => false}, Q = ?Q:init(Opts), ?assertEqual(0, ?Q:max_len(Q)), Qx = lists:foldl(fun(I, AccQ) -> - AccQ1 = - ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), - ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1) - end, Q, lists:seq(1, 255)), + {undefined, AccQ1} = ?Q:in(#message{topic = <<"t1">>, qos = 1, payload = iolist_to_binary([I])}, AccQ), + {undefined, AccQ2} = ?Q:in(#message{topic = <<"t">>, qos = 1, payload = iolist_to_binary([I])}, AccQ1), + AccQ2 + end, Q, lists:seq(1, 255)), ?assertEqual(510, ?Q:len(Qx)), ?assertEqual([{len, 510}, {max_len, 0}, {dropped, 0}], ?Q:stats(Qx)). t_priority_mqueue2(_) -> - Opts = #{type => priority, max_length => 2, store_qos0 => false}, + Opts = #{max_length => 2, store_qos0 => false}, Q = ?Q:init("priority_queue2_test", Opts), 2 = ?Q:max_len(Q), - Q1 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), - Q2 = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), - Q3 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), - Q4 = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), + {_, Q1} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<1>>}, Q), + {_, Q2} = ?Q:in(#message{topic = <<"x">>, qos = 1, payload = <<2>>}, Q1), + {_, Q3} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<3>>}, Q2), + {_, Q4} = ?Q:in(#message{topic = <<"y">>, qos = 1, payload = <<4>>}, Q3), 4 = ?Q:len(Q4), {{value, _Val}, Q5} = ?Q:out(Q4), 3 = ?Q:len(Q5). diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index dda27c45b..42fcbc9d7 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -44,7 +44,7 @@ packet_type_name(_) -> ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). packet_validate(_) -> - ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS0}}]))), + ?assert(emqx_packet:validate(?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => 1}, [{<<"topic">>, #{qos => ?QOS_0}}]))), ?assert(emqx_packet:validate(?UNSUBSCRIBE_PACKET(89, [<<"topic">>]))), ?assert(emqx_packet:validate(?CONNECT_PACKET(#mqtt_packet_connect{}))), ?assert(emqx_packet:validate(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Response-Topic' => <<"responsetopic">>, 'Topic-Alias' => 1}, <<"payload">>))), @@ -52,7 +52,7 @@ packet_validate(_) -> ?assertError(subscription_identifier_invalid, emqx_packet:validate( ?SUBSCRIBE_PACKET(15, #{'Subscription-Identifier' => -1}, - [{<<"topic">>, #{qos => ?QOS0}}]))), + [{<<"topic">>, #{qos => ?QOS_0}}]))), ?assertError(topic_filters_invalid, emqx_packet:validate(?UNSUBSCRIBE_PACKET(1,[]))), ?assertError(topic_name_invalid, @@ -90,14 +90,14 @@ packet_validate(_) -> packet_message(_) -> Pkt = #mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH, - qos = ?QOS0, + qos = ?QOS_0, retain = false, dup = false}, variable = #mqtt_packet_publish{topic_name = <<"topic">>, packet_id = 10, properties = #{}}, payload = <<"payload">>}, - Msg = emqx_message:make(<<"clientid">>, ?QOS0, <<"topic">>, <<"payload">>), + Msg = emqx_message:make(<<"clientid">>, ?QOS_0, <<"topic">>, <<"payload">>), Msg2 = emqx_message:set_flag(retain, false, Msg), Pkt = emqx_packet:from_message(10, Msg2), Msg3 = emqx_message:set_header(username, "test", Msg2), @@ -112,8 +112,8 @@ packet_format(_) -> io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), + io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS_0}, {<<"topic1">>, ?QOS_1}]))]), + io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS_0, ?QOS_1]))]), io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). @@ -122,7 +122,7 @@ packet_will_msg(_) -> client_id = <<"clientid">>, username = "test", will_retain = true, - will_qos = ?QOS2, + will_qos = ?QOS_2, will_topic = <<"topic">>, will_props = #{}, will_payload = <<"payload">>}, diff --git a/test/emqx_protocol_SUITE.erl.bk b/test/emqx_protocol_SUITE.erl.bk deleted file mode 100644 index f2d6d306a..000000000 --- a/test/emqx_protocol_SUITE.erl.bk +++ /dev/null @@ -1,147 +0,0 @@ -%%-------------------------------------------------------------------- -%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io) -%% -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%% http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%-------------------------------------------------------------------- - --module(emqx_protocol_SUITE). - --compile(export_all). --compile(nowarn_export_all). - --include("emqx.hrl"). - --include("emqx_mqtt.hrl"). - --include_lib("eunit/include/eunit.hrl"). - --import(emqx_serializer, [serialize/1]). - -all() -> - [{group, parser}, - {group, serializer}, - {group, packet}, - {group, message}]. - -groups() -> - [{parser, [], - [parse_connect, - parse_bridge, - parse_publish, - parse_puback, - parse_pubrec, - parse_pubrel, - parse_pubcomp, - parse_subscribe, - parse_unsubscribe, - parse_pingreq, - parse_disconnect]}, - {serializer, [], - [serialize_connect, - serialize_connack, - serialize_publish, - serialize_puback, - serialize_pubrel, - serialize_subscribe, - serialize_suback, - serialize_unsubscribe, - serialize_unsuback, - serialize_pingreq, - serialize_pingresp, - serialize_disconnect]}, - {packet, [], - [packet_proto_name, - packet_type_name, - packet_connack_name, - packet_format]}, - {message, [], - [message_make, - message_from_packet, - message_flag]}]. - - - -%%-------------------------------------------------------------------- -%% Packet Cases -%%-------------------------------------------------------------------- - -packet_proto_name(_) -> - ?assertEqual(<<"MQIsdp">>, emqx_packet:protocol_name(3)), - ?assertEqual(<<"MQTT">>, emqx_packet:protocol_name(4)). - -packet_type_name(_) -> - ?assertEqual('CONNECT', emqx_packet:type_name(?CONNECT)), - ?assertEqual('UNSUBSCRIBE', emqx_packet:type_name(?UNSUBSCRIBE)). - -packet_connack_name(_) -> - ?assertEqual('CONNACK_ACCEPT', emqx_packet:connack_name(?CONNACK_ACCEPT)), - ?assertEqual('CONNACK_PROTO_VER', emqx_packet:connack_name(?CONNACK_PROTO_VER)), - ?assertEqual('CONNACK_INVALID_ID', emqx_packet:connack_name(?CONNACK_INVALID_ID)), - ?assertEqual('CONNACK_SERVER', emqx_packet:connack_name(?CONNACK_SERVER)), - ?assertEqual('CONNACK_CREDENTIALS', emqx_packet:connack_name(?CONNACK_CREDENTIALS)), - ?assertEqual('CONNACK_AUTH', emqx_packet:connack_name(?CONNACK_AUTH)). - -packet_format(_) -> - io:format("~s", [emqx_packet:format(?CONNECT_PACKET(#mqtt_packet_connect{}))]), - io:format("~s", [emqx_packet:format(?CONNACK_PACKET(?CONNACK_SERVER))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_1, 1))]), - io:format("~s", [emqx_packet:format(?PUBLISH_PACKET(?QOS_2, <<"topic">>, 10, <<"payload">>))]), - io:format("~s", [emqx_packet:format(?PUBACK_PACKET(?PUBACK, 98))]), - io:format("~s", [emqx_packet:format(?PUBREL_PACKET(99))]), - io:format("~s", [emqx_packet:format(?SUBSCRIBE_PACKET(15, [{<<"topic">>, ?QOS0}, {<<"topic1">>, ?QOS1}]))]), - io:format("~s", [emqx_packet:format(?SUBACK_PACKET(40, [?QOS0, ?QOS1]))]), - io:format("~s", [emqx_packet:format(?UNSUBSCRIBE_PACKET(89, [<<"t">>, <<"t2">>]))]), - io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]). - -%%-------------------------------------------------------------------- -%% Message Cases -%%-------------------------------------------------------------------- - -message_make(_) -> - Msg = emqx_message:make(<<"clientid">>, <<"topic">>, <<"payload">>), - ?assertEqual(0, Msg#mqtt_message.qos), - Msg1 = emqx_message:make(<<"clientid">>, qos2, <<"topic">>, <<"payload">>), - ?assert(is_binary(Msg1#mqtt_message.id)), - ?assertEqual(2, Msg1#mqtt_message.qos). - -message_from_packet(_) -> - Msg = emqx_message:from_packet(?PUBLISH_PACKET(1, <<"topic">>, 10, <<"payload">>)), - ?assertEqual(1, Msg#mqtt_message.qos), - ?assertEqual(10, Msg#mqtt_message.pktid), - ?assertEqual(<<"topic">>, Msg#mqtt_message.topic), - WillMsg = emqx_message:from_packet(#mqtt_packet_connect{will_flag = true, - will_topic = <<"WillTopic">>, - will_msg = <<"WillMsg">>}), - ?assertEqual(<<"WillTopic">>, WillMsg#mqtt_message.topic), - ?assertEqual(<<"WillMsg">>, WillMsg#mqtt_message.payload), - - Msg2 = emqx_message:from_packet(<<"username">>, <<"clientid">>, - ?PUBLISH_PACKET(1, <<"topic">>, 20, <<"payload">>)), - ?assertEqual({<<"clientid">>, <<"username">>}, Msg2#mqtt_message.from), - io:format("~s", [emqx_message:format(Msg2)]). - -message_flag(_) -> - Pkt = ?PUBLISH_PACKET(1, <<"t">>, 2, <<"payload">>), - Msg2 = emqx_message:from_packet(<<"clientid">>, Pkt), - Msg3 = emqx_message:set_flag(retain, Msg2), - Msg4 = emqx_message:set_flag(dup, Msg3), - ?assert(Msg4#mqtt_message.dup), - ?assert(Msg4#mqtt_message.retain), - Msg5 = emqx_message:set_flag(Msg4), - Msg6 = emqx_message:unset_flag(dup, Msg5), - Msg7 = emqx_message:unset_flag(retain, Msg6), - ?assertNot(Msg7#mqtt_message.dup), - ?assertNot(Msg7#mqtt_message.retain), - emqx_message:unset_flag(Msg7), - emqx_message:to_packet(Msg7). - diff --git a/test/ws_client.erl b/test/ws_client.erl index 25f38164d..39f01467a 100644 --- a/test/ws_client.erl +++ b/test/ws_client.erl @@ -52,10 +52,10 @@ init(_, _WSReq) -> {ok, #state{}}. websocket_handle(Frame, _, State = #state{waiting = undefined, buffer = Buffer}) -> - lager:info("Client received frame~p", [Frame]), + logger:info("Client received frame~p", [Frame]), {ok, State#state{buffer = [Frame|Buffer]}}; websocket_handle(Frame, _, State = #state{waiting = From}) -> - lager:info("Client received frame~p", [Frame]), + logger:info("Client received frame~p", [Frame]), From ! Frame, {ok, State#state{waiting = undefined}}.