Merge pull request #4727 from emqx/resolve-conflict-master-to-v5.0

Auto-pull-request-on-2021-05-01
This commit is contained in:
Zaiming (Stone) Shi 2021-05-07 22:59:23 +02:00 committed by GitHub
commit 3a9b15b8d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 788 additions and 137 deletions

View File

@ -1,4 +1,4 @@
ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
FROM ${BUILD_FROM}
ARG EMQX_NAME=emqx

View File

@ -3,7 +3,7 @@ version: '3.9'
services:
erlang:
container_name: erlang
image: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
image: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
env_file:
- conf.env
environment:

View File

@ -15,7 +15,7 @@ on:
jobs:
prepare:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
@ -255,8 +255,8 @@ jobs:
old_vsns=($(git tag -l "v$pre_vsn.[0-9]" | sed "s/v$vsn//"))
fi
mkdir -p tmp/relup_packages/$PROFILE
cd tmp/relup_packages/$PROFILE
mkdir -p _upgrade_base
cd _upgrade_base
for tag in ${old_vsns[@]};do
if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then
wget https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip
@ -267,7 +267,7 @@ jobs:
cd -
- name: build emqx packages
env:
ERL_OTP: erl23.2.7.2-emqx-1
ERL_OTP: erl23.2.7.2-emqx-2
PROFILE: ${{ matrix.profile }}
ARCH: ${{ matrix.arch }}
SYSTEM: ${{ matrix.os }}
@ -406,11 +406,17 @@ jobs:
aws s3 cp --recursive _packages/${{ matrix.profile }} s3://${{ secrets.AWS_S3_BUCKET }}/$broker/${{ env.version }}
aws cloudfront create-invalidation --distribution-id ${{ secrets.AWS_CLOUDFRONT_ID }} --paths "/$broker/${{ env.version }}/*"
- uses: Rory-Z/upload-release-asset@v1
if: github.event_name == 'release'
if: github.event_name == 'release' && matrix.profile != 'emqx-ee'
with:
repo: emqx
path: "_packages/${{ matrix.profile }}/emqx-*"
token: ${{ github.token }}
- uses: Rory-Z/upload-release-asset@v1
if: github.event_name == 'release' && matrix.profile == 'emqx-ee'
with:
repo: emqx-enterprise
path: "_packages/${{ matrix.profile }}/emqx-*"
token: ${{ github.token }}
- name: update to emqx.io
if: github.event_name == 'release'
run: |

View File

@ -11,7 +11,7 @@ jobs:
strategy:
matrix:
erl_otp:
- erl23.2.7.2-emqx-1
- erl23.2.7.2-emqx-2
os:
- ubuntu20.04
- centos7

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs:
check_deps_integrity:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
steps:
- uses: actions/checkout@v2

View File

@ -152,7 +152,7 @@ jobs:
relup_test:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
defaults:
run:
shell: bash
@ -213,9 +213,9 @@ jobs:
pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')"
if [ $PROFILE = "emqx" ]; then
old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | tr "\n" " " | sed "s/v$vsn//")"
old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")"
else
old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | tr "\n" " " | sed "s/v$vsn//")"
old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")"
fi
echo "OLD_VSNS=$old_vsns" >> $GITHUB_ENV
- name: download emqx

View File

@ -13,7 +13,7 @@ on:
jobs:
run_static_analysis:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
steps:
- uses: actions/checkout@v2
@ -30,7 +30,7 @@ jobs:
run_proper_test:
runs-on: ubuntu-20.04
container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04
container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04
steps:
- uses: actions/checkout@v2

1
.gitignore vendored
View File

@ -46,3 +46,4 @@ emqx_dialyzer_*_plt
dist.zip
scripts/git-token
etc/*.seg
_upgrade_base/

View File

@ -1,11 +1,11 @@
$(shell $(CURDIR)/scripts/git-hooks-init.sh)
REBAR_VERSION = 3.14.3-emqx-6
REBAR_VERSION = 3.14.3-emqx-7
REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
export EMQX_CE_DASHBOARD_VERSION ?= v4.3.0
export EMQX_CE_DASHBOARD_VERSION ?= v4.3.1
ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none
endif
@ -111,30 +111,35 @@ xref: $(REBAR)
dialyzer: $(REBAR)
@$(REBAR) as check dialyzer
.PHONY: $(REL_PROFILES:%=relup-%)
$(REL_PROFILES:%=relup-%): $(REBAR)
ifneq ($(OS),Windows_NT)
@$(BUILD) $(@:relup-%=%) relup
endif
COMMON_DEPS := $(REBAR) get-dashboard $(CONF_SEGS)
.PHONY: $(REL_PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar)
$(REL_PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar): $(REBAR) get-dashboard $(CONF_SEGS)
@$(BUILD) $(subst -tar,,$(@)) tar
## rel target is to create release package without relup
.PHONY: $(REL_PROFILES:%=%-rel) $(PKG_PROFILES:%=%-rel)
$(REL_PROFILES:%=%-rel) $(PKG_PROFILES:%=%-rel): $(COMMON_DEPS)
@$(BUILD) $(subst -rel,,$(@)) rel
## zip targets depend on the corresponding relup and tar artifacts
## relup target is to create relup instructions
.PHONY: $(REL_PROFILES:%=%-relup)
define gen-relup-target
$1-relup: $(COMMON_DEPS)
@$(BUILD) $1 relup
endef
ALL_ZIPS = $(REL_PROFILES)
$(foreach zt,$(ALL_ZIPS),$(eval $(call gen-relup-target,$(zt))))
## zip target is to create a release package .zip with relup
.PHONY: $(REL_PROFILES:%=%-zip)
define gen-zip-target
$1-zip: relup-$1 $1-tar
$1-zip: $1-relup
@$(BUILD) $1 zip
endef
ALL_ZIPS = $(REL_PROFILES) $(PKG_PROFILES)
ALL_ZIPS = $(REL_PROFILES)
$(foreach zt,$(ALL_ZIPS),$(eval $(call gen-zip-target,$(zt))))
## A pkg target depend on a regular release profile zip to include relup,
## and also a -pkg suffixed profile tar (without relup) for making deb/rpm package
## A pkg target depend on a regular release
.PHONY: $(PKG_PROFILES)
define gen-pkg-target
$1: $(subst -pkg,,$1)-zip $1-tar
$1: $1-rel
@$(BUILD) $1 pkg
endef
$(foreach pt,$(PKG_PROFILES),$(eval $(call gen-pkg-target,$(pt))))

View File

@ -18,7 +18,8 @@ auth.http.auth_req.method = post
## The possible values of the Content-Type header: application/x-www-form-urlencoded, application/json
##
## Examples: auth.http.auth_req.headers.accept = */*
auth.http.auth_req.headers.content-type = "application/x-www-form-urlencoded"
auth.http.auth_req.headers.content_type = "application/x-www-form-urlencoded"
## Parameters used to construct the request body or query string parameters
## When the request method is GET, these parameters will be converted into query string parameters

View File

@ -96,7 +96,7 @@ translate_env(EnvName) ->
{retry_timeout, 1000}] ++ MoreOpts,
Method = proplists:get_value(method, Req),
Headers = proplists:get_value(headers, Req),
NHeaders = ensure_content_type_header(Method, to_lower(Headers)),
NHeaders = ensure_content_type_header(Method, emqx_http_lib:normalise_headers(Headers)),
NReq = lists:keydelete(headers, 1, Req),
{ok, Timeout} = application:get_env(?APP, timeout),
application:set_env(?APP, EnvName, [{path, Path},
@ -145,9 +145,6 @@ unload_hooks() ->
_ = ehttpc_sup:stop_pool('emqx_auth_http/acl_req'),
ok.
to_lower(Headers) ->
[{string:to_lower(K), V} || {K, V} <- Headers].
ensure_content_type_header(Method, Headers)
when Method =:= post orelse Method =:= put ->
Headers;

View File

@ -505,7 +505,8 @@ do_import_acl_mnesia(Acls) ->
-ifdef(EMQX_ENTERPRISE).
import_modules(Modules) ->
case ets:info(emqx_modules) of
undefined -> [];
undefined ->
ok;
_ ->
lists:foreach(fun(#{<<"id">> := Id,
<<"type">> := Type,
@ -649,9 +650,9 @@ do_import_data(Data, Version) ->
-ifdef(EMQX_ENTERPRISE).
do_import_extra_data(Data, _Version) ->
import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
import_modules(maps:get(<<"modules">>, Data, [])),
import_schemas(maps:get(<<"schemas">>, Data, [])),
_ = import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])),
_ = import_modules(maps:get(<<"modules">>, Data, [])),
_ = import_schemas(maps:get(<<"schemas">>, Data, [])),
ok.
-else.
do_import_extra_data(_Data, _Version) -> ok.

View File

@ -610,8 +610,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
stop(Reason, Reply, State#state{channel = NChannel})
end.
handle_info(Info, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_info(Info, Channel), State).
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
maybe_send_will_msg(Reason, State),
handle_return(emqx_channel:handle_info(Info, Channel), State).
handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State).
@ -782,21 +783,21 @@ stop({shutdown, Reason}, State) ->
stop(Reason, State);
stop(Reason, State) ->
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
case Reason of
%% FIXME: The Will-Msg should publish when a Session terminated!
Reason when Reason =:= normal ->
ok;
_ ->
do_publish_will(State)
end,
maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, State}.
stop({shutdown, Reason}, Reply, State) ->
stop(Reason, Reply, State);
stop(Reason, Reply, State) ->
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, Reply, State}.
maybe_send_will_msg(normal, _State) ->
ok;
maybe_send_will_msg(_Reason, State) ->
do_publish_will(State).
stop_log_level(Reason) when ?is_non_error_reason(Reason) ->
debug;
stop_log_level(_) ->

View File

@ -941,6 +941,41 @@ t_will_test5(_) ->
gen_udp:close(Socket).
t_will_case06(_) ->
QoS = 1,
Duration = 1,
WillMsg = <<10, 11, 12, 13, 14>>,
WillTopic = <<"abc">>,
{ok, Socket} = gen_udp:open(0, [binary]),
ClientId = <<"test">>,
ok = emqx_broker:subscribe(WillTopic),
send_connect_msg_with_will1(Socket, Duration, ClientId),
?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)),
send_willtopic_msg(Socket, WillTopic, QoS),
?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)),
send_willmsg_msg(Socket, WillMsg),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_pingreq_msg(Socket, undefined),
?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)),
% wait udp client keepalive timeout
timer:sleep(2000),
receive
{deliver, WillTopic, #message{payload = WillMsg}} -> ok;
Msg -> ct:print("recevived --- unex: ~p", [Msg])
after
1000 -> ct:fail(wait_willmsg_timeout)
end,
send_disconnect_msg(Socket, undefined),
gen_udp:close(Socket).
t_asleep_test01_timeout(_) ->
QoS = 1,
Duration = 1,
@ -1564,6 +1599,15 @@ send_connect_msg_with_will(Socket, Duration, ClientId) ->
?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
send_connect_msg_with_will1(Socket, Duration, ClientId) ->
Length = 10,
Will = 1,
CleanSession = 0,
ProtocolId = 1,
ConnectPacket = <<Length:8, ?SN_CONNECT:8, ?FNU:4, Will:1, CleanSession:1,
?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>,
ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket).
send_willtopic_msg(Socket, Topic, QoS) ->
Length = 3+byte_size(Topic),
MsgType = ?SN_WILLTOPIC,

View File

@ -168,8 +168,8 @@ validate_params_and_headers(ClientState, ClientId) ->
end
catch
throw : {unknown_client, Other} ->
ct:pal("ignored_event_from_other_client ~p~n~p~n~p",
[Other, Params, Headers]),
ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p",
[Other, ClientId, Params, Headers]),
validate_params_and_headers(ClientState, ClientId) %% continue looping
end
after

82
bin/node_dump Executable file
View File

@ -0,0 +1,82 @@
#!/bin/sh
set -eu
ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)"
echo "Running node dump in ${ROOT_DIR}"
cd "${ROOT_DIR}"
DUMP="log/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz"
CONF_DUMP="log/conf.dump"
SYSINFO="log/sysinfo.txt"
LOG_MAX_AGE_DAYS=3
collect() {
echo "========================================================"
echo " $*"
echo "========================================================"
eval "$*" || echo "Unavailable"
echo
}
show_help() {
echo "Collect information about the EMQ X node
USAGE:
bin/node_dump [-a DAYS]
OPTIONS:
-a n Set maximum age of collected log files in days (3 by default)"
exit 1
}
while getopts "a:h" opt; do
case "${opt}" in
a) LOG_MAX_AGE_DAYS="${OPTARG}" ;;
h) show_help ;;
*) ;;
esac
done
# Collect system info:
{
collect bin/emqx_ctl broker
collect bin/emqx eval "'emqx_node_dump:sys_info()'"
collect uname -a
collect uptime
collect free
collect netstat -tnl
collect bin/emqx_ctl plugins list
collect bin/emqx_ctl modules list
collect bin/emqx_ctl vm all
collect bin/emqx_ctl listeners
} > "${SYSINFO}"
# Collect information about the configuration:
{
collect bin/emqx eval "'emqx_node_dump:app_env_dump()'"
} > "${CONF_DUMP}"
# Pack files
{
find log -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \)
echo "${SYSINFO}"
echo "${CONF_DUMP}"
} | tar czf "${DUMP}" -T -
## Cleanup:
rm "${SYSINFO}"
#rm "${CONF_DUMP}" # Keep it for inspection
echo "Created a node dump ${DUMP}"
echo
echo "WARNING: this script tries to obfuscate secrets, but make sure to
inspect log/conf.dump file manually before uploading the node dump
to a public location."

52
build
View File

@ -2,7 +2,7 @@
# This script helps to build release artifacts.
# arg1: profile, e.g. emqx | emqx-edge | emqx-pkg | emqx-edge-pkg
# arg2: artifact, e.g. tar | relup | zip | pkg
# arg2: artifact, e.g. rel | relup | zip | pkg
set -euo pipefail
@ -62,42 +62,48 @@ log() {
echo "===< $msg"
}
make_tar() {
./rebar3 as "$PROFILE" tar
make_rel() {
# shellcheck disable=SC1010
./rebar3 as "$PROFILE" do release,tar
}
## unzip previous version .zip files to _build/$PROFILE/rel/emqx/releases before making relup
make_relup() {
local lib_dir="_build/$PROFILE/rel/emqx/lib"
local releases_dir="_build/$PROFILE/rel/emqx/releases"
mkdir -p "$lib_dir" "$releases_dir"
local releases=()
if [ -d "$releases_dir" ]; then
while read -r dir; do
local version
version="$(basename "$dir")"
# skip current version
if [ "$version" != "$PKG_VSN" ]; then
releases+=( "$version" )
while read -r zip; do
local base_vsn
base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")"
if [ ! -d "$releases_dir/$base_vsn" ]; then
local tmp_dir
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
unzip -q "$zip" "emqx/releases/*" -d "$tmp_dir"
unzip -q "$zip" "emqx/lib/*" -d "$tmp_dir"
cp -r -n "$tmp_dir/emqx/releases"/* "$releases_dir"
cp -r -n "$tmp_dir/emqx/lib"/* "$lib_dir"
rm -rf "$tmp_dir"
fi
done < <(find "_build/$PROFILE/rel/emqx/releases" -maxdepth 1 -name '*.*.*' -type d)
releases+=( "$base_vsn" )
done < <(find _upgrade_base -maxdepth 1 -name "*$PROFILE-$SYSTEM*-$ARCH.zip" -type f)
fi
if [ ${#releases[@]} -eq 0 ]; then
log "No previous release found, relup ignored"
log "No upgrade base found, relup ignored"
return 0
fi
if [ ${#releases[@]} -gt 1 ]; then
log "Found more than one previous versions in $releases_dir:"
log "${releases[@]}"
log "ERROR: So far we can not support multi base-version relup creation"
return 1
fi
local base_version="${releases[0]}"
# TODO: comma separate base-versions when supported
./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}" --upfrom "$base_version"
RELX_BASE_VERSIONS="$(IFS=, ; echo "${releases[*]}")"
export RELX_BASE_VERSIONS
./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}"
}
## make_zip turns .tar.gz into a .zip with a slightly different name.
## It assumes the .tar.gz has been built -- relies on Makefile dependency
make_zip() {
# build the tarball again to ensure relup is included
make_rel
tard="/tmp/emqx_untar_${PKG_VSN}"
rm -rf "${tard}"
mkdir -p "${tard}/emqx"
@ -117,8 +123,8 @@ make_zip() {
log "building artifact=$ARTIFACT for profile=$PROFILE"
case "$ARTIFACT" in
tar)
make_tar
rel)
make_rel
;;
relup)
make_relup
@ -132,8 +138,6 @@ case "$ARTIFACT" in
log "Skipped making deb/rpm package for $SYSTEM"
exit 0
fi
# build the tar which is going to be used as the base of deb and rpm packages
make_tar
make -C "deploy/packages/${PKGERDIR}" clean
EMQX_REL="$(pwd)" EMQX_BUILD="${PROFILE}" SYSTEM="${SYSTEM}" make -C "deploy/packages/${PKGERDIR}"
;;

View File

@ -1,4 +1,4 @@
ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-alpine-amd64
ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-alpine-amd64
ARG RUN_FROM=alpine:3.12
FROM ${BUILD_FROM} AS builder

View File

@ -62,7 +62,7 @@ docker-build:
@docker build --no-cache \
--build-arg PKG_VSN=$(PKG_VSN) \
--build-arg BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-alpine-$(ARCH) \
--build-arg BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-alpine-$(ARCH) \
--build-arg RUN_FROM=$(ARCH)/alpine:3.12 \
--build-arg EMQX_NAME=$(EMQX_NAME) \
--build-arg QEMU_ARCH=$(QEMU_ARCH) \

View File

@ -451,9 +451,13 @@ log.file = emqx.log
## Default: No Limit
#log.chars_limit = 8192
## Log formatter
## Value: text | json
#log.formatter = text
## Log to single line
## Value: boolean
#log.single_line = false
## Value: Boolean
#log.single_line = true
## Enables the log rotation.
## With this enabled, new log files will be created when the current

View File

@ -113,3 +113,6 @@
## Specifies how long time (in milliseconds) to spend shutting down the system.
## See: http://erlang.org/doc/man/erl.html
-shutdown_time 30000
## patches dir
-pa {{ platform_data_dir }}/patches

View File

@ -112,3 +112,5 @@
## See: http://erlang.org/doc/man/erl.html
-shutdown_time 10000
## patches dir
-pa {{ platform_data_dir }}/patches

View File

@ -96,23 +96,6 @@
-define(DISCONNECT, 14). %% Client or Server is disconnecting
-define(AUTH, 15). %% Authentication exchange
-define(TYPE_NAMES, [
'CONNECT',
'CONNACK',
'PUBLISH',
'PUBACK',
'PUBREC',
'PUBREL',
'PUBCOMP',
'SUBSCRIBE',
'SUBACK',
'UNSUBSCRIBE',
'UNSUBACK',
'PINGREQ',
'PINGRESP',
'DISCONNECT',
'AUTH']).
%%--------------------------------------------------------------------
%% MQTT V3.1.1 Connect Return Codes
%%--------------------------------------------------------------------

View File

@ -29,7 +29,7 @@
-ifndef(EMQX_ENTERPRISE).
-define(EMQX_RELEASE, {opensource, "4.3-rc.5"}).
-define(EMQX_RELEASE, {opensource, "4.3.0"}).
-else.

View File

@ -494,9 +494,28 @@ end}.
{datatype, integer}
]}.
{mapping, "log.supervisor_reports", "kernel.logger", [
{default, error},
{datatype, {enum, [error, progress]}},
hidden
]}.
%% @doc Maximum depth in Erlang term log formatting
%% and message queue inspection.
{mapping, "log.max_depth", "kernel.error_logger_format_depth", [
{default, 20},
{datatype, [{enum, [unlimited]}, integer]}
]}.
%% @doc format logs as JSON objects
{mapping, "log.formatter", "kernel.logger", [
{default, text},
{datatype, {enum, [text, json]}}
]}.
%% @doc format logs in a single line.
{mapping, "log.single_line", "kernel.logger", [
{default, false},
{default, true},
{datatype, {enum, [true, false]}}
]}.
@ -617,7 +636,16 @@ end}.
V -> V
end,
SingleLine = cuttlefish:conf_get("log.single_line", Conf),
Formatter = {logger_formatter,
FmtName = cuttlefish:conf_get("log.formatter", Conf),
Formatter =
case FmtName of
json ->
{emqx_logger_jsonfmt,
#{chars_limit => CharsLimit,
single_line => SingleLine
}};
text ->
{emqx_logger_textfmt,
#{template =>
[time," [",level,"] ",
{clientid,
@ -630,7 +658,8 @@ end}.
msg,"\n"],
chars_limit => CharsLimit,
single_line => SingleLine
}},
}}
end,
{BustLimitOn, {MaxBurstCount, TimeWindow}} =
case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of
["disabled"] -> {false, {20000, 1000}};
@ -664,13 +693,21 @@ end}.
BasicConf#{max_no_bytes => MaxNoBytes}
end,
Filters = case cuttlefish:conf_get("log.supervisor_reports", Conf) of
error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}];
progress -> []
end,
%% For the default logger that outputs to console
DefaultHandler =
if LogTo =:= console orelse LogTo =:= both ->
[{handler, console, logger_std_h,
#{level => LogLevel,
config => #{type => standard_io},
formatter => Formatter}}];
formatter => Formatter,
filters => Filters
}
}];
true ->
[{handler, default, undefined}]
end,
@ -682,7 +719,9 @@ end}.
#{level => LogLevel,
config => FileConf(cuttlefish:conf_get("log.file", Conf)),
formatter => Formatter,
filesync_repeat_interval => no_repeat}}];
filesync_repeat_interval => no_repeat,
filters => Filters
}}];
true -> []
end,

View File

@ -28,8 +28,7 @@
{cover_export_enabled, true}.
{cover_excl_mods, [emqx_exproto_pb, emqx_exhook_pb]}.
{provider_hooks,[{pre,[{release,{relup_helper,gen_appups}}]}
]}.
{provider_hooks, [{pre, [{release, {relup_helper, gen_appups}}]}]}.
{post_hooks,[]}.
@ -44,7 +43,7 @@
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {branch, "hocon"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.0"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}}
, {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}}

View File

@ -310,12 +310,14 @@ relx_overlay(ReleaseType) ->
, {mkdir, "data/"}
, {mkdir, "data/mnesia"}
, {mkdir, "data/configs"}
, {mkdir, "data/patches"}
, {mkdir, "data/scripts"}
, {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"}
, {template, "data/loaded_modules.tmpl", "data/loaded_modules"}
, {template, "data/emqx_vars", "releases/emqx_vars"}
, {copy, "bin/emqx", "bin/emqx"}
, {copy, "bin/emqx_ctl", "bin/emqx_ctl"}
, {copy, "bin/node_dump", "bin/node_dump"}
, {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"}
, {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup
, {copy, "bin/emqx_ctl", "bin/emqx_ctl-{{release_version}}"} %% for relup

View File

@ -156,7 +156,6 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value,
parse_frame(Bin, Header, 0, Options) ->
{ok, packet(Header), Bin, ?none(Options)};
parse_frame(Bin, Header, Length, Options) ->
case Bin of
<<FrameBin:Length/binary, Rest/binary>> ->

View File

@ -19,6 +19,7 @@
-export([ uri_encode/1
, uri_decode/1
, uri_parse/1
, normalise_headers/1
]).
-export_type([uri_map/0]).
@ -91,6 +92,21 @@ do_parse(URI) ->
normalise_parse_result(Map2)
end.
%% @doc Return HTTP headers list with keys lower-cased and
%% underscores replaced with hyphens
%% NOTE: assuming the input Headers list is a proplists,
%% that is, when a key is duplicated, list header overrides tail
%% e.g. [{"Content_Type", "applicaiton/binary"}, {"content-type", "applicaiton/json"}]
%% results in: [{"content-type", "applicaiton/binary"}]
normalise_headers(Headers0) ->
F = fun({K0, V}) ->
K = re:replace(K0, "_", "-", [{return,list}]),
{string:lowercase(K), V}
end,
Headers = lists:map(F, Headers0),
Keys = proplists:get_keys(Headers),
[{K, proplists:get_value(K, Headers)} || K <- Keys].
normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) ->
Scheme = atom_scheme(Scheme0),
DefaultPort = case https =:= Scheme of

295
src/emqx_logger_jsonfmt.erl Normal file
View File

@ -0,0 +1,295 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% @doc This logger formatter tries format logs into JSON objects
%%
%% Due to the fact that the `Report' body of log entries are *NOT*
%% structured, we only try to JSON-ify `Meta',
%%
%% `Report' body format is pretty-printed and used as the `msg'
%% JSON field in the finale result.
%%
%% e.g. logger:log(info, _Data = #{foo => bar}, _Meta = #{metaf1 => 2})
%% will results in a JSON object look like below:
%%
%% {"time": 1620226963427808, "level": "info", "msg": "foo: bar", "metaf1": 2}
-module(emqx_logger_jsonfmt).
-export([format/2]).
-ifdef(TEST).
-export([report_cb_1/1, report_cb_2/2, report_cb_crash/2]).
-endif.
-export_type([config/0]).
-elvis([{elvis_style, no_nested_try_catch, #{ ignore => [emqx_logger_jsonfmt]}}]).
-type config() :: #{depth => pos_integer() | unlimited,
report_cb => logger:report_cb(),
single_line => boolean()}.
-define(IS_STRING(String), (is_list(String) orelse is_binary(String))).
-spec format(logger:log_event(), config()) -> iodata().
format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) ->
Config = add_default_config(Config0),
format(Msg, Meta#{level => Level}, Config).
format(Msg, Meta, Config) ->
Data0 =
try Meta#{msg => format_msg(Msg, Meta, Config)}
catch
C:R:S ->
Meta#{ msg => "emqx_logger_jsonfmt_format_error"
, fmt_raw_input => Msg
, fmt_error => C
, fmt_reason => R
, fmt_stacktrace => S
}
end,
Data = maps:without([report_cb], Data0),
jiffy:encode(json_obj(Data, Config)).
format_msg({string, Chardata}, Meta, Config) ->
format_msg({"~ts", [Chardata]}, Meta, Config);
format_msg({report, _} = Msg, Meta, #{report_cb := Fun} = Config)
when is_function(Fun,1); is_function(Fun,2) ->
format_msg(Msg, Meta#{report_cb => Fun}, maps:remove(report_cb, Config));
format_msg({report, Report}, #{report_cb := Fun} = Meta, Config) when is_function(Fun, 1) ->
case Fun(Report) of
{Format, Args} when is_list(Format), is_list(Args) ->
format_msg({Format, Args}, maps:remove(report_cb, Meta), Config);
Other ->
#{ msg => "report_cb_bad_return"
, report_cb_fun => Fun
, report_cb_return => Other
}
end;
format_msg({report, Report}, #{report_cb := Fun}, Config) when is_function(Fun, 2) ->
case Fun(Report, maps:with([depth, single_line], Config)) of
Chardata when ?IS_STRING(Chardata) ->
try
unicode:characters_to_binary(Chardata, utf8)
catch
_:_ ->
#{ msg => "report_cb_bad_return"
, report_cb_fun => Fun
, report_cb_return => Chardata
}
end;
Other ->
#{ msg => "report_cb_bad_return"
, report_cb_fun => Fun
, report_cb_return => Other
}
end;
format_msg({Fmt, Args}, _Meta, Config) ->
do_format_msg(Fmt, Args, Config).
do_format_msg(Format0, Args, #{depth := Depth,
single_line := SingleLine
}) ->
Format1 = io_lib:scan_format(Format0, Args),
Format = reformat(Format1, Depth, SingleLine),
Text0 = io_lib:build_text(Format, []),
Text = case SingleLine of
true -> re:replace(Text0, ",?\r?\n\s*",", ", [{return, list}, global, unicode]);
false -> Text0
end,
trim(unicode:characters_to_binary(Text, utf8)).
%% Get rid of the leading spaces.
%% leave alone the trailing spaces.
trim(<<$\s, Rest/binary>>) -> trim(Rest);
trim(Bin) -> Bin.
reformat(Format, unlimited, false) ->
Format;
reformat([#{control_char := C} = M | T], Depth, true) when C =:= $p ->
[limit_depth(M#{width => 0}, Depth) | reformat(T, Depth, true)];
reformat([#{control_char := C} = M | T], Depth, true) when C =:= $P ->
[M#{width => 0} | reformat(T, Depth, true)];
reformat([#{control_char := C}=M | T], Depth, Single) when C =:= $p; C =:= $w ->
[limit_depth(M, Depth) | reformat(T, Depth, Single)];
reformat([H | T], Depth, Single) ->
[H | reformat(T, Depth, Single)];
reformat([], _, _) ->
[].
limit_depth(M0, unlimited) -> M0;
limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) ->
C = C0 - ($a - $A), %To uppercase.
M0#{control_char := C, args := Args ++ [Depth]}.
add_default_config(Config0) ->
Default = #{single_line => true},
Depth = get_depth(maps:get(depth, Config0, undefined)),
maps:merge(Default, Config0#{depth => Depth}).
get_depth(undefined) -> error_logger:get_format_depth();
get_depth(S) -> max(5, S).
best_effort_unicode(Input, Config) ->
try unicode:characters_to_binary(Input, utf8) of
B when is_binary(B) -> B;
_ -> do_format_msg("~p", [Input], Config)
catch
_ : _ ->
do_format_msg("~p", [Input], Config)
end.
best_effort_json_obj(List, Config) when is_list(List) ->
try
json_obj(maps:from_list(List), Config)
catch
_ : _ ->
[json(I, Config) || I <- List]
end;
best_effort_json_obj(Map, Config) ->
try
json_obj(Map, Config)
catch
_ : _ ->
do_format_msg("~p", [Map], Config)
end.
json([], _) -> "[]";
json(<<"">>, _) -> "\"\"";
json(A, _) when is_atom(A) -> atom_to_binary(A, utf8);
json(I, _) when is_integer(I) -> I;
json(F, _) when is_float(F) -> F;
json(P, C) when is_pid(P) -> json(pid_to_list(P), C);
json(P, C) when is_port(P) -> json(port_to_list(P), C);
json(F, C) when is_function(F) -> json(erlang:fun_to_list(F), C);
json(B, Config) when is_binary(B) ->
best_effort_unicode(B, Config);
json(L, Config) when is_list(L), is_integer(hd(L))->
best_effort_unicode(L, Config);
json(M, Config) when is_list(M), is_tuple(hd(M)), tuple_size(hd(M)) =:= 2 ->
best_effort_json_obj(M, Config);
json(L, Config) when is_list(L) ->
[json(I, Config) || I <- L];
json(Map, Config) when is_map(Map) ->
best_effort_json_obj(Map, Config);
json(Term, Config) ->
do_format_msg("~p", [Term], Config).
json_obj(Data, Config) ->
maps:fold(fun (K, V, D) ->
json_kv(K, V, D, Config)
end, maps:new(), Data).
json_kv(mfa, {M, F, A}, Data, _Config) -> %% emqx/snabbkaffe
maps:put(mfa, <<(atom_to_binary(M, utf8))/binary, $:,
(atom_to_binary(F, utf8))/binary, $/,
(integer_to_binary(A))/binary>>, Data);
json_kv('$kind', Kind, Data, Config) -> %% snabbkaffe
maps:put(msg, json(Kind, Config), Data);
json_kv(K0, V, Data, Config) ->
K = json_key(K0),
case is_map(V) of
true -> maps:put(json(K, Config), best_effort_json_obj(V, Config), Data);
false -> maps:put(json(K, Config), json(V, Config), Data)
end.
json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8));
json_key(Term) ->
try unicode:characters_to_binary(Term, utf8) of
OK when is_binary(OK) andalso OK =/= <<>> ->
OK;
_ ->
throw({badkey, Term})
catch
_:_ ->
throw({badkey, Term})
end.
-ifdef(TEST).
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
no_crash_test_() ->
Opts = [{numtests, 1000}, {to_file, user}],
{timeout, 30,
fun() -> ?assert(proper:quickcheck(t_no_crash(), Opts)) end}.
t_no_crash() ->
?FORALL({Level, Report, Meta, Config},
{p_level(), p_report(), p_meta(), p_config()},
t_no_crash_run(Level, Report, Meta, Config)).
t_no_crash_run(Level, Report, {undefined, Meta}, Config) ->
t_no_crash_run(Level, Report, maps:from_list(Meta), Config);
t_no_crash_run(Level, Report, {ReportCb, Meta}, Config) ->
t_no_crash_run(Level, Report, maps:from_list([{report_cb, ReportCb} | Meta]), Config);
t_no_crash_run(Level, Report, Meta, Config) ->
Input = #{ level => Level
, msg => {report, Report}
, meta => filter(Meta)
},
_ = format(Input, maps:from_list(Config)),
true.
%% assume top level Report and Meta are sane
filter(Map) ->
Keys = lists:filter(
fun(K) ->
try json_key(K), true
catch throw : {badkey, _} -> false
end
end, maps:keys(Map)),
maps:with(Keys, Map).
p_report_cb() ->
proper_types:oneof([ fun ?MODULE:report_cb_1/1
, fun ?MODULE:report_cb_2/2
, fun ?MODULE:report_cb_crash/2
, fun logger:format_otp_report/1
, fun logger:format_report/1
, format_report_undefined
]).
report_cb_1(Input) -> {"~p", [Input]}.
report_cb_2(Input, _Config) -> io_lib:format("~p", [Input]).
report_cb_crash(_Input, _Config) -> error(report_cb_crash).
p_kvlist() ->
proper_types:list({
proper_types:oneof([proper_types:atom(),
proper_types:binary()
]), proper_types:term()}).
%% meta type is 2-tuple, report_cb type, and some random key value pairs
p_meta() ->
{p_report_cb(), p_kvlist()}.
p_report() -> p_kvlist().
p_limit() -> proper_types:oneof([proper_types:pos_integer(), unlimited]).
p_level() -> proper_types:oneof([info, debug, error, warning, foobar]).
p_config() ->
proper_types:shrink_list(
[ {depth, p_limit()}
, {single_line, proper_types:boolean()}
]).
-endif.

View File

@ -0,0 +1,49 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_logger_textfmt).
-export([format/2]).
-export([check_config/1]).
%% metadata fields which we do not wish to merge into log data
-define(WITHOUT_MERGE,
[ report_cb % just a callback
, time % formatted as a part of templated message
, peername % formatted as a part of templated message
, clientid % formatted as a part of templated message
, gl % not interesting
]).
check_config(X) -> logger_formatter:check_config(X).
format(#{msg := Msg0, meta := Meta} = Event, Config) ->
Msg = maybe_merge(Msg0, Meta),
logger_formatter:format(Event#{msg := Msg}, Config).
maybe_merge({report, Report}, Meta) when is_map(Report) ->
{report, maps:merge(rename(Report), filter(Meta))};
maybe_merge(Report, _Meta) ->
Report.
filter(Meta) ->
maps:without(?WITHOUT_MERGE, Meta).
rename(#{'$kind' := Kind} = Meta0) -> % snabbkaffe
Meta = maps:remove('$kind', Meta0),
Meta#{msg => Kind};
rename(Meta) ->
Meta.

75
src/emqx_node_dump.erl Normal file
View File

@ -0,0 +1,75 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
%% Collection of functions for creating node dumps
-module(emqx_node_dump).
-export([ sys_info/0
, app_env_dump/0
]).
sys_info() ->
#{ release => emqx_app:get_release()
, otp_version => emqx_vm:get_otp_version()
}.
app_env_dump() ->
censor(ets:tab2list(ac_tab)).
censor([]) ->
[];
censor([{{env, App, Key}, Val} | Rest]) ->
[{{env, App, Key}, censor([Key, App], Val)} | censor(Rest)];
censor([_ | Rest]) ->
censor(Rest).
censor(Path, {Key, Val}) when is_atom(Key) ->
{Key, censor([Key|Path], Val)};
censor(Path, M) when is_map(M) ->
Fun = fun(Key, Val) ->
censor([Key|Path], Val)
end,
maps:map(Fun, M);
censor(Path, L = [Fst|_]) when is_tuple(Fst) ->
[censor(Path, I) || I <- L];
censor(Path, Val) ->
case Path of
[password|_] ->
obfuscate_value(Val);
[secret|_] ->
obfuscate_value(Val);
_ ->
Val
end.
obfuscate_value(Val) when is_binary(Val) ->
<<"********">>;
obfuscate_value(_Val) ->
"********".
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
censor_test() ->
?assertMatch( [{{env, emqx, listeners}, #{password := <<"********">>}}]
, censor([foo, {{env, emqx, listeners}, #{password => <<"secret">>}}, {app, bar}])
),
?assertMatch( [{{env, emqx, listeners}, [{foo, 1}, {password, "********"}]}]
, censor([{{env, emqx, listeners}, [{foo, 1}, {password, "secret"}]}])
).
-endif. %% TEST

View File

@ -46,6 +46,24 @@
-export([format/1]).
-define(TYPE_NAMES,
{ 'CONNECT'
, 'CONNACK'
, 'PUBLISH'
, 'PUBACK'
, 'PUBREC'
, 'PUBREL'
, 'PUBCOMP'
, 'SUBSCRIBE'
, 'SUBACK'
, 'UNSUBSCRIBE'
, 'UNSUBACK'
, 'PINGREQ'
, 'PINGRESP'
, 'DISCONNECT'
, 'AUTH'
}).
-type(connect() :: #mqtt_packet_connect{}).
-type(publish() :: #mqtt_packet_publish{}).
-type(subscribe() :: #mqtt_packet_subscribe{}).
@ -61,9 +79,13 @@ type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) ->
Type.
%% @doc Name of MQTT packet type.
-spec(type_name(emqx_types:packet()) -> atom()).
type_name(Packet) when is_record(Packet, mqtt_packet) ->
lists:nth(type(Packet), ?TYPE_NAMES).
-spec(type_name(emqx_types:packet() | non_neg_integer()) -> atom() | string()).
type_name(#mqtt_packet{} = Packet) ->
type_name(type(Packet));
type_name(0) -> 'FORBIDDEN';
type_name(Type) when Type > 0 andalso Type =< tuple_size(?TYPE_NAMES) ->
element(Type, ?TYPE_NAMES);
type_name(Type) -> "UNKNOWN("++ integer_to_list(Type) ++")".
%% @doc Dup flag of MQTT packet.
-spec(dup(emqx_types:packet()) -> boolean()).
@ -229,13 +251,16 @@ set_props(Props, #mqtt_packet_auth{} = Pkt) ->
%% @doc Check PubSub Packet.
-spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe())
-> ok | {error, emqx_types:reason_code()}).
check(#mqtt_packet{variable = PubPkt}) when is_record(PubPkt, mqtt_packet_publish) ->
check(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH},
variable = PubPkt}) when not is_tuple(PubPkt) ->
%% publish without any data
%% disconnect instead of crash
{error, ?RC_PROTOCOL_ERROR};
check(#mqtt_packet{variable = #mqtt_packet_publish{} = PubPkt}) ->
check(PubPkt);
check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscribe) ->
check(#mqtt_packet{variable = #mqtt_packet_subscribe{} = SubPkt}) ->
check(SubPkt);
check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) ->
check(#mqtt_packet{variable = #mqtt_packet_unsubscribe{} = UnsubPkt}) ->
check(UnsubPkt);
%% A Topic Alias of 0 is not permitted.
@ -417,12 +442,11 @@ format_header(#mqtt_packet_header{type = Type,
dup = Dup,
qos = QoS,
retain = Retain}, S) ->
S1 = if
S == undefined -> <<>>;
true -> [", ", S]
S1 = case S == undefined of
true -> <<>>;
false -> [", ", S]
end,
io_lib:format("~s(Q~p, R~p, D~p~s)",
[lists:nth(Type, ?TYPE_NAMES), QoS, i(Retain), i(Dup), S1]).
io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]).
format_variable(undefined, _) ->
undefined;

View File

@ -266,28 +266,38 @@ maybe_trans(Fun, Args) ->
end, [])
end.
%% The created fun only terminates with explicit exception
-dialyzer({nowarn_function, [trans/2]}).
-spec(trans(function(), list(any())) -> ok | {error, term()}).
trans(Fun, Args) ->
%% trigger selective receive optimization of compiler,
%% ideal for handling bursty traffic.
Ref = erlang:make_ref(),
Owner = self(),
{WPid, RefMon} = spawn_monitor(
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
Owner ! {Ref, Res}
end),
{WPid, RefMon} =
spawn_monitor(
%% NOTE: this is under the assumption that crashes in Fun
%% are caught by mnesia:transaction/2.
%% Future changes should keep in mind that this process
%% always exit with database write result.
fun() ->
Res = case mnesia:transaction(Fun, Args) of
{atomic, Ok} -> Ok;
{aborted, Reason} -> {error, Reason}
end,
exit({shutdown, Res})
end),
%% Receive a 'shutdown' exit to pass result from the short-lived process.
%% so the receive below can be receive-mark optimized by the compiler.
%%
%% If the result is sent as a regular message, we'll have to
%% either demonitor (with flush which is essentially a 'receive' since
%% the process is no longer alive after the result has been received),
%% or use a plain 'receive' to drain the normal 'DOWN' message.
%% However the compiler does not optimize this second 'receive'.
receive
{Ref, TransRes} ->
receive
{'DOWN', RefMon, process, WPid, normal} -> ok
end,
TransRes;
{'DOWN', RefMon, process, WPid, Info} ->
{error, {trans_crash, Info}}
case Info of
{shutdown, Result} -> Result;
_ -> {error, {trans_crash, Info}}
end
end.
lock_router() ->

View File

@ -122,7 +122,7 @@ match(Topic) when is_binary(Topic) ->
%% @doc Is the trie empty?
-spec(empty() -> boolean()).
empty() -> ets:info(?TRIE, size) == 0.
empty() -> ets:first(?TRIE) =:= '$end_of_table'.
-spec lock_tables() -> ok.
lock_tables() ->

View File

@ -77,3 +77,8 @@ uri_parse_test_() ->
end
}
].
normalise_headers_test() ->
?assertEqual([{"content-type", "applicaiton/binary"}],
emqx_http_lib:normalise_headers([{"Content_Type", "applicaiton/binary"},
{"content-type", "applicaiton/json"}])).

View File

@ -309,3 +309,7 @@ t_format(_) ->
io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]),
io:format("~s", [emqx_packet:format(?DISCONNECT_PACKET(128))]).
t_parse_empty_publish(_) ->
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)
{ok, Packet, <<>>, {none, _}} = emqx_frame:parse(<<52, 0>>),
?assertEqual({error, ?RC_PROTOCOL_ERROR}, emqx_packet:check(Packet)).