diff --git a/.ci/build_packages/Dockerfile b/.ci/build_packages/Dockerfile index 3e6fa83c6..aa45c970b 100644 --- a/.ci/build_packages/Dockerfile +++ b/.ci/build_packages/Dockerfile @@ -1,4 +1,4 @@ -ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 +ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 FROM ${BUILD_FROM} ARG EMQX_NAME=emqx diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index 869153d86..a502aeb6e 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -3,7 +3,7 @@ version: '3.9' services: erlang: container_name: erlang - image: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + image: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 env_file: - conf.env environment: diff --git a/.github/workflows/build_packages.yaml b/.github/workflows/build_packages.yaml index e833f0d61..697820299 100644 --- a/.github/workflows/build_packages.yaml +++ b/.github/workflows/build_packages.yaml @@ -15,7 +15,7 @@ on: jobs: prepare: runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 outputs: profiles: ${{ steps.set_profile.outputs.profiles}} @@ -255,8 +255,8 @@ jobs: old_vsns=($(git tag -l "v$pre_vsn.[0-9]" | sed "s/v$vsn//")) fi - mkdir -p tmp/relup_packages/$PROFILE - cd tmp/relup_packages/$PROFILE + mkdir -p _upgrade_base + cd _upgrade_base for tag in ${old_vsns[@]};do if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then wget https://s3-${{ secrets.AWS_DEFAULT_REGION }}.amazonaws.com/${{ secrets.AWS_S3_BUCKET }}/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip @@ -267,7 +267,7 @@ jobs: cd - - name: build emqx packages env: - ERL_OTP: erl23.2.7.2-emqx-1 + ERL_OTP: erl23.2.7.2-emqx-2 PROFILE: ${{ matrix.profile }} ARCH: ${{ matrix.arch }} SYSTEM: ${{ matrix.os }} @@ -406,11 +406,17 @@ jobs: aws s3 cp --recursive _packages/${{ matrix.profile }} s3://${{ secrets.AWS_S3_BUCKET }}/$broker/${{ env.version }} aws cloudfront create-invalidation --distribution-id ${{ secrets.AWS_CLOUDFRONT_ID }} --paths "/$broker/${{ env.version }}/*" - uses: Rory-Z/upload-release-asset@v1 - if: github.event_name == 'release' + if: github.event_name == 'release' && matrix.profile != 'emqx-ee' with: repo: emqx path: "_packages/${{ matrix.profile }}/emqx-*" token: ${{ github.token }} + - uses: Rory-Z/upload-release-asset@v1 + if: github.event_name == 'release' && matrix.profile == 'emqx-ee' + with: + repo: emqx-enterprise + path: "_packages/${{ matrix.profile }}/emqx-*" + token: ${{ github.token }} - name: update to emqx.io if: github.event_name == 'release' run: | diff --git a/.github/workflows/build_slim_packages.yaml b/.github/workflows/build_slim_packages.yaml index 7dad88eed..a78d93a56 100644 --- a/.github/workflows/build_slim_packages.yaml +++ b/.github/workflows/build_slim_packages.yaml @@ -11,7 +11,7 @@ jobs: strategy: matrix: erl_otp: - - erl23.2.7.2-emqx-1 + - erl23.2.7.2-emqx-2 os: - ubuntu20.04 - centos7 diff --git a/.github/workflows/check_deps_integrity.yaml b/.github/workflows/check_deps_integrity.yaml index 9b23f27d0..0aa8f7903 100644 --- a/.github/workflows/check_deps_integrity.yaml +++ b/.github/workflows/check_deps_integrity.yaml @@ -5,7 +5,7 @@ on: [pull_request] jobs: check_deps_integrity: runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/run_fvt_tests.yaml b/.github/workflows/run_fvt_tests.yaml index 8360d638d..68aec59a6 100644 --- a/.github/workflows/run_fvt_tests.yaml +++ b/.github/workflows/run_fvt_tests.yaml @@ -152,7 +152,7 @@ jobs: relup_test: runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 defaults: run: shell: bash @@ -213,9 +213,9 @@ jobs: pre_vsn="$(echo $vsn | grep -oE '^[0-9]+.[0-9]')" if [ $PROFILE = "emqx" ]; then - old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | tr "\n" " " | sed "s/v$vsn//")" + old_vsns="$(git tag -l "v$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")" else - old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | tr "\n" " " | sed "s/v$vsn//")" + old_vsns="$(git tag -l "e$pre_vsn.[0-9]" | xargs echo -n | sed "s/v$vsn//")" fi echo "OLD_VSNS=$old_vsns" >> $GITHUB_ENV - name: download emqx diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 0b3f8da13..430e73594 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -13,7 +13,7 @@ on: jobs: run_static_analysis: runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 steps: - uses: actions/checkout@v2 @@ -30,7 +30,7 @@ jobs: run_proper_test: runs-on: ubuntu-20.04 - container: emqx/build-env:erl23.2.7.2-emqx-1-ubuntu20.04 + container: emqx/build-env:erl23.2.7.2-emqx-2-ubuntu20.04 steps: - uses: actions/checkout@v2 diff --git a/.gitignore b/.gitignore index c702d3955..218b22261 100644 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,4 @@ emqx_dialyzer_*_plt dist.zip scripts/git-token etc/*.seg +_upgrade_base/ diff --git a/Makefile b/Makefile index f0c5d0833..01fc40435 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,11 @@ $(shell $(CURDIR)/scripts/git-hooks-init.sh) -REBAR_VERSION = 3.14.3-emqx-6 +REBAR_VERSION = 3.14.3-emqx-7 REBAR = $(CURDIR)/rebar3 BUILD = $(CURDIR)/build SCRIPTS = $(CURDIR)/scripts export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh) export EMQX_DESC ?= EMQ X -export EMQX_CE_DASHBOARD_VERSION ?= v4.3.0 +export EMQX_CE_DASHBOARD_VERSION ?= v4.3.1 ifeq ($(OS),Windows_NT) export REBAR_COLOR=none endif @@ -111,30 +111,35 @@ xref: $(REBAR) dialyzer: $(REBAR) @$(REBAR) as check dialyzer -.PHONY: $(REL_PROFILES:%=relup-%) -$(REL_PROFILES:%=relup-%): $(REBAR) -ifneq ($(OS),Windows_NT) - @$(BUILD) $(@:relup-%=%) relup -endif +COMMON_DEPS := $(REBAR) get-dashboard $(CONF_SEGS) -.PHONY: $(REL_PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar) -$(REL_PROFILES:%=%-tar) $(PKG_PROFILES:%=%-tar): $(REBAR) get-dashboard $(CONF_SEGS) - @$(BUILD) $(subst -tar,,$(@)) tar +## rel target is to create release package without relup +.PHONY: $(REL_PROFILES:%=%-rel) $(PKG_PROFILES:%=%-rel) +$(REL_PROFILES:%=%-rel) $(PKG_PROFILES:%=%-rel): $(COMMON_DEPS) + @$(BUILD) $(subst -rel,,$(@)) rel -## zip targets depend on the corresponding relup and tar artifacts +## relup target is to create relup instructions +.PHONY: $(REL_PROFILES:%=%-relup) +define gen-relup-target +$1-relup: $(COMMON_DEPS) + @$(BUILD) $1 relup +endef +ALL_ZIPS = $(REL_PROFILES) +$(foreach zt,$(ALL_ZIPS),$(eval $(call gen-relup-target,$(zt)))) + +## zip target is to create a release package .zip with relup .PHONY: $(REL_PROFILES:%=%-zip) define gen-zip-target -$1-zip: relup-$1 $1-tar +$1-zip: $1-relup @$(BUILD) $1 zip endef -ALL_ZIPS = $(REL_PROFILES) $(PKG_PROFILES) +ALL_ZIPS = $(REL_PROFILES) $(foreach zt,$(ALL_ZIPS),$(eval $(call gen-zip-target,$(zt)))) -## A pkg target depend on a regular release profile zip to include relup, -## and also a -pkg suffixed profile tar (without relup) for making deb/rpm package +## A pkg target depend on a regular release .PHONY: $(PKG_PROFILES) define gen-pkg-target -$1: $(subst -pkg,,$1)-zip $1-tar +$1: $1-rel @$(BUILD) $1 pkg endef $(foreach pt,$(PKG_PROFILES),$(eval $(call gen-pkg-target,$(pt)))) diff --git a/apps/emqx_auth_http/etc/emqx_auth_http.conf b/apps/emqx_auth_http/etc/emqx_auth_http.conf index e8e28116b..d4e62ce9c 100644 --- a/apps/emqx_auth_http/etc/emqx_auth_http.conf +++ b/apps/emqx_auth_http/etc/emqx_auth_http.conf @@ -18,7 +18,8 @@ auth.http.auth_req.method = post ## The possible values of the Content-Type header: application/x-www-form-urlencoded, application/json ## ## Examples: auth.http.auth_req.headers.accept = */* -auth.http.auth_req.headers.content-type = "application/x-www-form-urlencoded" + +auth.http.auth_req.headers.content_type = "application/x-www-form-urlencoded" ## Parameters used to construct the request body or query string parameters ## When the request method is GET, these parameters will be converted into query string parameters diff --git a/apps/emqx_auth_http/src/emqx_auth_http_app.erl b/apps/emqx_auth_http/src/emqx_auth_http_app.erl index c5a7f2a77..7d4781f7c 100644 --- a/apps/emqx_auth_http/src/emqx_auth_http_app.erl +++ b/apps/emqx_auth_http/src/emqx_auth_http_app.erl @@ -96,7 +96,7 @@ translate_env(EnvName) -> {retry_timeout, 1000}] ++ MoreOpts, Method = proplists:get_value(method, Req), Headers = proplists:get_value(headers, Req), - NHeaders = ensure_content_type_header(Method, to_lower(Headers)), + NHeaders = ensure_content_type_header(Method, emqx_http_lib:normalise_headers(Headers)), NReq = lists:keydelete(headers, 1, Req), {ok, Timeout} = application:get_env(?APP, timeout), application:set_env(?APP, EnvName, [{path, Path}, @@ -145,9 +145,6 @@ unload_hooks() -> _ = ehttpc_sup:stop_pool('emqx_auth_http/acl_req'), ok. -to_lower(Headers) -> - [{string:to_lower(K), V} || {K, V} <- Headers]. - ensure_content_type_header(Method, Headers) when Method =:= post orelse Method =:= put -> Headers; diff --git a/apps/emqx_management/src/emqx_mgmt_data_backup.erl b/apps/emqx_management/src/emqx_mgmt_data_backup.erl index 75b929d08..c9df698ed 100644 --- a/apps/emqx_management/src/emqx_mgmt_data_backup.erl +++ b/apps/emqx_management/src/emqx_mgmt_data_backup.erl @@ -505,7 +505,8 @@ do_import_acl_mnesia(Acls) -> -ifdef(EMQX_ENTERPRISE). import_modules(Modules) -> case ets:info(emqx_modules) of - undefined -> []; + undefined -> + ok; _ -> lists:foreach(fun(#{<<"id">> := Id, <<"type">> := Type, @@ -649,9 +650,9 @@ do_import_data(Data, Version) -> -ifdef(EMQX_ENTERPRISE). do_import_extra_data(Data, _Version) -> - import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), - import_modules(maps:get(<<"modules">>, Data, [])), - import_schemas(maps:get(<<"schemas">>, Data, [])), + _ = import_confs(maps:get(<<"configs">>, Data, []), maps:get(<<"listeners_state">>, Data, [])), + _ = import_modules(maps:get(<<"modules">>, Data, [])), + _ = import_schemas(maps:get(<<"schemas">>, Data, [])), ok. -else. do_import_extra_data(_Data, _Version) -> ok. diff --git a/apps/emqx_sn/src/emqx_sn_gateway.erl b/apps/emqx_sn/src/emqx_sn_gateway.erl index f24860e6b..bfb2f28df 100644 --- a/apps/emqx_sn/src/emqx_sn_gateway.erl +++ b/apps/emqx_sn/src/emqx_sn_gateway.erl @@ -610,8 +610,9 @@ handle_call(_From, Req, State = #state{channel = Channel}) -> stop(Reason, Reply, State#state{channel = NChannel}) end. -handle_info(Info, State = #state{channel = Channel}) -> - handle_return(emqx_channel:handle_info(Info, Channel), State). +handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) -> + maybe_send_will_msg(Reason, State), + handle_return(emqx_channel:handle_info(Info, Channel), State). handle_timeout(TRef, TMsg, State = #state{channel = Channel}) -> handle_return(emqx_channel:handle_timeout(TRef, TMsg, Channel), State). @@ -782,21 +783,21 @@ stop({shutdown, Reason}, State) -> stop(Reason, State); stop(Reason, State) -> ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), - case Reason of - %% FIXME: The Will-Msg should publish when a Session terminated! - Reason when Reason =:= normal -> - ok; - _ -> - do_publish_will(State) - end, + maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, State}. stop({shutdown, Reason}, Reply, State) -> stop(Reason, Reply, State); stop(Reason, Reply, State) -> ?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]), + maybe_send_will_msg(Reason, State), {stop, {shutdown, Reason}, Reply, State}. +maybe_send_will_msg(normal, _State) -> + ok; +maybe_send_will_msg(_Reason, State) -> + do_publish_will(State). + stop_log_level(Reason) when ?is_non_error_reason(Reason) -> debug; stop_log_level(_) -> diff --git a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl index 746d0f4a1..ad0c5f032 100644 --- a/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl +++ b/apps/emqx_sn/test/emqx_sn_protocol_SUITE.erl @@ -941,6 +941,41 @@ t_will_test5(_) -> gen_udp:close(Socket). +t_will_case06(_) -> + QoS = 1, + Duration = 1, + WillMsg = <<10, 11, 12, 13, 14>>, + WillTopic = <<"abc">>, + {ok, Socket} = gen_udp:open(0, [binary]), + ClientId = <<"test">>, + + ok = emqx_broker:subscribe(WillTopic), + + send_connect_msg_with_will1(Socket, Duration, ClientId), + ?assertEqual(<<2, ?SN_WILLTOPICREQ>>, receive_response(Socket)), + + send_willtopic_msg(Socket, WillTopic, QoS), + ?assertEqual(<<2, ?SN_WILLMSGREQ>>, receive_response(Socket)), + + send_willmsg_msg(Socket, WillMsg), + ?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)), + + send_pingreq_msg(Socket, undefined), + ?assertEqual(<<2, ?SN_PINGRESP>>, receive_response(Socket)), + + % wait udp client keepalive timeout + timer:sleep(2000), + + receive + {deliver, WillTopic, #message{payload = WillMsg}} -> ok; + Msg -> ct:print("recevived --- unex: ~p", [Msg]) + after + 1000 -> ct:fail(wait_willmsg_timeout) + end, + send_disconnect_msg(Socket, undefined), + + gen_udp:close(Socket). + t_asleep_test01_timeout(_) -> QoS = 1, Duration = 1, @@ -1564,6 +1599,15 @@ send_connect_msg_with_will(Socket, Duration, ClientId) -> ?FNU:2, ProtocolId:8, Duration:16, ClientId/binary>>, ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). +send_connect_msg_with_will1(Socket, Duration, ClientId) -> + Length = 10, + Will = 1, + CleanSession = 0, + ProtocolId = 1, + ConnectPacket = <>, + ok = gen_udp:send(Socket, ?HOST, ?PORT, ConnectPacket). + send_willtopic_msg(Socket, Topic, QoS) -> Length = 3+byte_size(Topic), MsgType = ?SN_WILLTOPIC, diff --git a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl index cd4fb4869..864e1b150 100644 --- a/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl +++ b/apps/emqx_web_hook/test/emqx_web_hook_SUITE.erl @@ -168,8 +168,8 @@ validate_params_and_headers(ClientState, ClientId) -> end catch throw : {unknown_client, Other} -> - ct:pal("ignored_event_from_other_client ~p~n~p~n~p", - [Other, Params, Headers]), + ct:pal("ignored_event_from_other_client ~p~nexpecting:~p~n~p~n~p", + [Other, ClientId, Params, Headers]), validate_params_and_headers(ClientState, ClientId) %% continue looping end after diff --git a/bin/node_dump b/bin/node_dump new file mode 100755 index 000000000..056b9a822 --- /dev/null +++ b/bin/node_dump @@ -0,0 +1,82 @@ +#!/bin/sh +set -eu + +ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)" + +echo "Running node dump in ${ROOT_DIR}" + +cd "${ROOT_DIR}" + +DUMP="log/node_dump_$(date +"%Y%m%d_%H%M%S").tar.gz" +CONF_DUMP="log/conf.dump" +SYSINFO="log/sysinfo.txt" + +LOG_MAX_AGE_DAYS=3 + +collect() { + echo "========================================================" + echo " $*" + echo "========================================================" + eval "$*" || echo "Unavailable" + echo +} + +show_help() { + echo "Collect information about the EMQ X node + +USAGE: + + bin/node_dump [-a DAYS] + +OPTIONS: + + -a n Set maximum age of collected log files in days (3 by default)" + exit 1 +} + +while getopts "a:h" opt; do + case "${opt}" in + a) LOG_MAX_AGE_DAYS="${OPTARG}" ;; + h) show_help ;; + *) ;; + esac +done + +# Collect system info: +{ + collect bin/emqx_ctl broker + collect bin/emqx eval "'emqx_node_dump:sys_info()'" + + collect uname -a + collect uptime + collect free + collect netstat -tnl + + collect bin/emqx_ctl plugins list + collect bin/emqx_ctl modules list + + collect bin/emqx_ctl vm all + collect bin/emqx_ctl listeners +} > "${SYSINFO}" + +# Collect information about the configuration: +{ + collect bin/emqx eval "'emqx_node_dump:app_env_dump()'" +} > "${CONF_DUMP}" + +# Pack files +{ + find log -mtime -"${LOG_MAX_AGE_DAYS}" \( -name '*.log.*' -or -name 'run_erl.log*' \) + echo "${SYSINFO}" + echo "${CONF_DUMP}" +} | tar czf "${DUMP}" -T - + +## Cleanup: +rm "${SYSINFO}" +#rm "${CONF_DUMP}" # Keep it for inspection + +echo "Created a node dump ${DUMP}" +echo +echo "WARNING: this script tries to obfuscate secrets, but make sure to +inspect log/conf.dump file manually before uploading the node dump +to a public location." diff --git a/build b/build index c4f57d4ea..af7ff9865 100755 --- a/build +++ b/build @@ -2,7 +2,7 @@ # This script helps to build release artifacts. # arg1: profile, e.g. emqx | emqx-edge | emqx-pkg | emqx-edge-pkg -# arg2: artifact, e.g. tar | relup | zip | pkg +# arg2: artifact, e.g. rel | relup | zip | pkg set -euo pipefail @@ -62,42 +62,48 @@ log() { echo "===< $msg" } -make_tar() { - ./rebar3 as "$PROFILE" tar +make_rel() { + # shellcheck disable=SC1010 + ./rebar3 as "$PROFILE" do release,tar } ## unzip previous version .zip files to _build/$PROFILE/rel/emqx/releases before making relup make_relup() { + local lib_dir="_build/$PROFILE/rel/emqx/lib" local releases_dir="_build/$PROFILE/rel/emqx/releases" + mkdir -p "$lib_dir" "$releases_dir" local releases=() if [ -d "$releases_dir" ]; then - while read -r dir; do - local version - version="$(basename "$dir")" - # skip current version - if [ "$version" != "$PKG_VSN" ]; then - releases+=( "$version" ) + while read -r zip; do + local base_vsn + base_vsn="$(echo "$zip" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-[0-9a-e]{8})?")" + if [ ! -d "$releases_dir/$base_vsn" ]; then + local tmp_dir + tmp_dir="$(mktemp -d -t emqx.XXXXXXX)" + unzip -q "$zip" "emqx/releases/*" -d "$tmp_dir" + unzip -q "$zip" "emqx/lib/*" -d "$tmp_dir" + cp -r -n "$tmp_dir/emqx/releases"/* "$releases_dir" + cp -r -n "$tmp_dir/emqx/lib"/* "$lib_dir" + rm -rf "$tmp_dir" fi - done < <(find "_build/$PROFILE/rel/emqx/releases" -maxdepth 1 -name '*.*.*' -type d) + releases+=( "$base_vsn" ) + done < <(find _upgrade_base -maxdepth 1 -name "*$PROFILE-$SYSTEM*-$ARCH.zip" -type f) fi if [ ${#releases[@]} -eq 0 ]; then - log "No previous release found, relup ignored" + log "No upgrade base found, relup ignored" return 0 fi - if [ ${#releases[@]} -gt 1 ]; then - log "Found more than one previous versions in $releases_dir:" - log "${releases[@]}" - log "ERROR: So far we can not support multi base-version relup creation" - return 1 - fi - local base_version="${releases[0]}" - # TODO: comma separate base-versions when supported - ./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}" --upfrom "$base_version" + RELX_BASE_VERSIONS="$(IFS=, ; echo "${releases[*]}")" + export RELX_BASE_VERSIONS + ./rebar3 as "$PROFILE" relup --relname emqx --relvsn "${PKG_VSN}" } ## make_zip turns .tar.gz into a .zip with a slightly different name. ## It assumes the .tar.gz has been built -- relies on Makefile dependency make_zip() { + # build the tarball again to ensure relup is included + make_rel + tard="/tmp/emqx_untar_${PKG_VSN}" rm -rf "${tard}" mkdir -p "${tard}/emqx" @@ -117,8 +123,8 @@ make_zip() { log "building artifact=$ARTIFACT for profile=$PROFILE" case "$ARTIFACT" in - tar) - make_tar + rel) + make_rel ;; relup) make_relup @@ -132,8 +138,6 @@ case "$ARTIFACT" in log "Skipped making deb/rpm package for $SYSTEM" exit 0 fi - # build the tar which is going to be used as the base of deb and rpm packages - make_tar make -C "deploy/packages/${PKGERDIR}" clean EMQX_REL="$(pwd)" EMQX_BUILD="${PROFILE}" SYSTEM="${SYSTEM}" make -C "deploy/packages/${PKGERDIR}" ;; diff --git a/deploy/docker/Dockerfile b/deploy/docker/Dockerfile index 1891c39a0..4e25a001d 100644 --- a/deploy/docker/Dockerfile +++ b/deploy/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-alpine-amd64 +ARG BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-alpine-amd64 ARG RUN_FROM=alpine:3.12 FROM ${BUILD_FROM} AS builder diff --git a/docker.mk b/docker.mk index 8178e0938..e2fe61d36 100644 --- a/docker.mk +++ b/docker.mk @@ -62,7 +62,7 @@ docker-build: @docker build --no-cache \ --build-arg PKG_VSN=$(PKG_VSN) \ - --build-arg BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-1-alpine-$(ARCH) \ + --build-arg BUILD_FROM=emqx/build-env:erl23.2.7.2-emqx-2-alpine-$(ARCH) \ --build-arg RUN_FROM=$(ARCH)/alpine:3.12 \ --build-arg EMQX_NAME=$(EMQX_NAME) \ --build-arg QEMU_ARCH=$(QEMU_ARCH) \ diff --git a/etc/emqx.conf b/etc/emqx.conf index 3e9d394b1..dca8811a9 100644 --- a/etc/emqx.conf +++ b/etc/emqx.conf @@ -451,9 +451,13 @@ log.file = emqx.log ## Default: No Limit #log.chars_limit = 8192 +## Log formatter +## Value: text | json +#log.formatter = text + ## Log to single line -## Value: boolean -#log.single_line = false +## Value: Boolean +#log.single_line = true ## Enables the log rotation. ## With this enabled, new log files will be created when the current diff --git a/etc/emqx_cloud/vm.args b/etc/emqx_cloud/vm.args index d18d2f20d..1e6b0b4cb 100644 --- a/etc/emqx_cloud/vm.args +++ b/etc/emqx_cloud/vm.args @@ -113,3 +113,6 @@ ## Specifies how long time (in milliseconds) to spend shutting down the system. ## See: http://erlang.org/doc/man/erl.html -shutdown_time 30000 + +## patches dir +-pa {{ platform_data_dir }}/patches diff --git a/etc/emqx_edge/vm.args b/etc/emqx_edge/vm.args index 9f722d1dd..ef9749738 100644 --- a/etc/emqx_edge/vm.args +++ b/etc/emqx_edge/vm.args @@ -112,3 +112,5 @@ ## See: http://erlang.org/doc/man/erl.html -shutdown_time 10000 +## patches dir +-pa {{ platform_data_dir }}/patches diff --git a/include/emqx_mqtt.hrl b/include/emqx_mqtt.hrl index 3e6bf6e3e..e6e9bffe5 100644 --- a/include/emqx_mqtt.hrl +++ b/include/emqx_mqtt.hrl @@ -96,23 +96,6 @@ -define(DISCONNECT, 14). %% Client or Server is disconnecting -define(AUTH, 15). %% Authentication exchange --define(TYPE_NAMES, [ - 'CONNECT', - 'CONNACK', - 'PUBLISH', - 'PUBACK', - 'PUBREC', - 'PUBREL', - 'PUBCOMP', - 'SUBSCRIBE', - 'SUBACK', - 'UNSUBSCRIBE', - 'UNSUBACK', - 'PINGREQ', - 'PINGRESP', - 'DISCONNECT', - 'AUTH']). - %%-------------------------------------------------------------------- %% MQTT V3.1.1 Connect Return Codes %%-------------------------------------------------------------------- diff --git a/include/emqx_release.hrl b/include/emqx_release.hrl index 2369dbd5b..cc5064c69 100644 --- a/include/emqx_release.hrl +++ b/include/emqx_release.hrl @@ -29,7 +29,7 @@ -ifndef(EMQX_ENTERPRISE). --define(EMQX_RELEASE, {opensource, "4.3-rc.5"}). +-define(EMQX_RELEASE, {opensource, "4.3.0"}). -else. diff --git a/priv/emqx.schema b/priv/emqx.schema index a108e6857..1d145d8f5 100644 --- a/priv/emqx.schema +++ b/priv/emqx.schema @@ -494,9 +494,28 @@ end}. {datatype, integer} ]}. +{mapping, "log.supervisor_reports", "kernel.logger", [ + {default, error}, + {datatype, {enum, [error, progress]}}, + hidden +]}. + +%% @doc Maximum depth in Erlang term log formatting +%% and message queue inspection. +{mapping, "log.max_depth", "kernel.error_logger_format_depth", [ + {default, 20}, + {datatype, [{enum, [unlimited]}, integer]} +]}. + +%% @doc format logs as JSON objects +{mapping, "log.formatter", "kernel.logger", [ + {default, text}, + {datatype, {enum, [text, json]}} +]}. + %% @doc format logs in a single line. {mapping, "log.single_line", "kernel.logger", [ - {default, false}, + {default, true}, {datatype, {enum, [true, false]}} ]}. @@ -617,7 +636,16 @@ end}. V -> V end, SingleLine = cuttlefish:conf_get("log.single_line", Conf), - Formatter = {logger_formatter, + FmtName = cuttlefish:conf_get("log.formatter", Conf), + Formatter = + case FmtName of + json -> + {emqx_logger_jsonfmt, + #{chars_limit => CharsLimit, + single_line => SingleLine + }}; + text -> + {emqx_logger_textfmt, #{template => [time," [",level,"] ", {clientid, @@ -630,7 +658,8 @@ end}. msg,"\n"], chars_limit => CharsLimit, single_line => SingleLine - }}, + }} + end, {BustLimitOn, {MaxBurstCount, TimeWindow}} = case string:tokens(cuttlefish:conf_get("log.burst_limit", Conf), ", ") of ["disabled"] -> {false, {20000, 1000}}; @@ -664,13 +693,21 @@ end}. BasicConf#{max_no_bytes => MaxNoBytes} end, + Filters = case cuttlefish:conf_get("log.supervisor_reports", Conf) of + error -> [{drop_progress_reports, {fun logger_filters:progress/2, stop}}]; + progress -> [] + end, + %% For the default logger that outputs to console DefaultHandler = if LogTo =:= console orelse LogTo =:= both -> [{handler, console, logger_std_h, #{level => LogLevel, config => #{type => standard_io}, - formatter => Formatter}}]; + formatter => Formatter, + filters => Filters + } + }]; true -> [{handler, default, undefined}] end, @@ -682,7 +719,9 @@ end}. #{level => LogLevel, config => FileConf(cuttlefish:conf_get("log.file", Conf)), formatter => Formatter, - filesync_repeat_interval => no_repeat}}]; + filesync_repeat_interval => no_repeat, + filters => Filters + }}]; true -> [] end, diff --git a/rebar.config b/rebar.config index 12bb8eb98..84603617a 100644 --- a/rebar.config +++ b/rebar.config @@ -28,8 +28,7 @@ {cover_export_enabled, true}. {cover_excl_mods, [emqx_exproto_pb, emqx_exhook_pb]}. -{provider_hooks,[{pre,[{release,{relup_helper,gen_appups}}]} - ]}. +{provider_hooks, [{pre, [{release, {relup_helper, gen_appups}}]}]}. {post_hooks,[]}. @@ -44,7 +43,7 @@ , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}} , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1"}}} , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}} - , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {branch, "hocon"}}} + , {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v4.0.0"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.5"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.1"}}} , {replayq, {git, "https://github.com/emqx/replayq", {tag, "0.3.2"}}} diff --git a/rebar.config.erl b/rebar.config.erl index b908f8097..c4c0419b9 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -310,12 +310,14 @@ relx_overlay(ReleaseType) -> , {mkdir, "data/"} , {mkdir, "data/mnesia"} , {mkdir, "data/configs"} + , {mkdir, "data/patches"} , {mkdir, "data/scripts"} , {template, "data/loaded_plugins.tmpl", "data/loaded_plugins"} , {template, "data/loaded_modules.tmpl", "data/loaded_modules"} , {template, "data/emqx_vars", "releases/emqx_vars"} , {copy, "bin/emqx", "bin/emqx"} , {copy, "bin/emqx_ctl", "bin/emqx_ctl"} + , {copy, "bin/node_dump", "bin/node_dump"} , {copy, "bin/install_upgrade.escript", "bin/install_upgrade.escript"} , {copy, "bin/emqx", "bin/emqx-{{release_version}}"} %% for relup , {copy, "bin/emqx_ctl", "bin/emqx_ctl-{{release_version}}"} %% for relup diff --git a/src/emqx_frame.erl b/src/emqx_frame.erl index 1af2f010a..7e5aae788 100644 --- a/src/emqx_frame.erl +++ b/src/emqx_frame.erl @@ -156,7 +156,6 @@ parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Header, Multiplier, Value, parse_frame(Bin, Header, 0, Options) -> {ok, packet(Header), Bin, ?none(Options)}; - parse_frame(Bin, Header, Length, Options) -> case Bin of <> -> diff --git a/src/emqx_http_lib.erl b/src/emqx_http_lib.erl index 9e86fe394..60f19e6bb 100644 --- a/src/emqx_http_lib.erl +++ b/src/emqx_http_lib.erl @@ -19,6 +19,7 @@ -export([ uri_encode/1 , uri_decode/1 , uri_parse/1 + , normalise_headers/1 ]). -export_type([uri_map/0]). @@ -91,6 +92,21 @@ do_parse(URI) -> normalise_parse_result(Map2) end. +%% @doc Return HTTP headers list with keys lower-cased and +%% underscores replaced with hyphens +%% NOTE: assuming the input Headers list is a proplists, +%% that is, when a key is duplicated, list header overrides tail +%% e.g. [{"Content_Type", "applicaiton/binary"}, {"content-type", "applicaiton/json"}] +%% results in: [{"content-type", "applicaiton/binary"}] +normalise_headers(Headers0) -> + F = fun({K0, V}) -> + K = re:replace(K0, "_", "-", [{return,list}]), + {string:lowercase(K), V} + end, + Headers = lists:map(F, Headers0), + Keys = proplists:get_keys(Headers), + [{K, proplists:get_value(K, Headers)} || K <- Keys]. + normalise_parse_result(#{host := Host, scheme := Scheme0} = Map) -> Scheme = atom_scheme(Scheme0), DefaultPort = case https =:= Scheme of diff --git a/src/emqx_logger_jsonfmt.erl b/src/emqx_logger_jsonfmt.erl new file mode 100644 index 000000000..7d0a1a328 --- /dev/null +++ b/src/emqx_logger_jsonfmt.erl @@ -0,0 +1,295 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc This logger formatter tries format logs into JSON objects +%% +%% Due to the fact that the `Report' body of log entries are *NOT* +%% structured, we only try to JSON-ify `Meta', +%% +%% `Report' body format is pretty-printed and used as the `msg' +%% JSON field in the finale result. +%% +%% e.g. logger:log(info, _Data = #{foo => bar}, _Meta = #{metaf1 => 2}) +%% will results in a JSON object look like below: +%% +%% {"time": 1620226963427808, "level": "info", "msg": "foo: bar", "metaf1": 2} + +-module(emqx_logger_jsonfmt). + +-export([format/2]). + +-ifdef(TEST). +-export([report_cb_1/1, report_cb_2/2, report_cb_crash/2]). +-endif. + +-export_type([config/0]). + +-elvis([{elvis_style, no_nested_try_catch, #{ ignore => [emqx_logger_jsonfmt]}}]). + +-type config() :: #{depth => pos_integer() | unlimited, + report_cb => logger:report_cb(), + single_line => boolean()}. + +-define(IS_STRING(String), (is_list(String) orelse is_binary(String))). + +-spec format(logger:log_event(), config()) -> iodata(). +format(#{level := Level, msg := Msg, meta := Meta}, Config0) when is_map(Config0) -> + Config = add_default_config(Config0), + format(Msg, Meta#{level => Level}, Config). + +format(Msg, Meta, Config) -> + Data0 = + try Meta#{msg => format_msg(Msg, Meta, Config)} + catch + C:R:S -> + Meta#{ msg => "emqx_logger_jsonfmt_format_error" + , fmt_raw_input => Msg + , fmt_error => C + , fmt_reason => R + , fmt_stacktrace => S + } + end, + Data = maps:without([report_cb], Data0), + jiffy:encode(json_obj(Data, Config)). + +format_msg({string, Chardata}, Meta, Config) -> + format_msg({"~ts", [Chardata]}, Meta, Config); +format_msg({report, _} = Msg, Meta, #{report_cb := Fun} = Config) + when is_function(Fun,1); is_function(Fun,2) -> + format_msg(Msg, Meta#{report_cb => Fun}, maps:remove(report_cb, Config)); +format_msg({report, Report}, #{report_cb := Fun} = Meta, Config) when is_function(Fun, 1) -> + case Fun(Report) of + {Format, Args} when is_list(Format), is_list(Args) -> + format_msg({Format, Args}, maps:remove(report_cb, Meta), Config); + Other -> + #{ msg => "report_cb_bad_return" + , report_cb_fun => Fun + , report_cb_return => Other + } + end; +format_msg({report, Report}, #{report_cb := Fun}, Config) when is_function(Fun, 2) -> + case Fun(Report, maps:with([depth, single_line], Config)) of + Chardata when ?IS_STRING(Chardata) -> + try + unicode:characters_to_binary(Chardata, utf8) + catch + _:_ -> + #{ msg => "report_cb_bad_return" + , report_cb_fun => Fun + , report_cb_return => Chardata + } + end; + Other -> + #{ msg => "report_cb_bad_return" + , report_cb_fun => Fun + , report_cb_return => Other + } + end; +format_msg({Fmt, Args}, _Meta, Config) -> + do_format_msg(Fmt, Args, Config). + +do_format_msg(Format0, Args, #{depth := Depth, + single_line := SingleLine + }) -> + Format1 = io_lib:scan_format(Format0, Args), + Format = reformat(Format1, Depth, SingleLine), + Text0 = io_lib:build_text(Format, []), + Text = case SingleLine of + true -> re:replace(Text0, ",?\r?\n\s*",", ", [{return, list}, global, unicode]); + false -> Text0 + end, + trim(unicode:characters_to_binary(Text, utf8)). + +%% Get rid of the leading spaces. +%% leave alone the trailing spaces. +trim(<<$\s, Rest/binary>>) -> trim(Rest); +trim(Bin) -> Bin. + +reformat(Format, unlimited, false) -> + Format; +reformat([#{control_char := C} = M | T], Depth, true) when C =:= $p -> + [limit_depth(M#{width => 0}, Depth) | reformat(T, Depth, true)]; +reformat([#{control_char := C} = M | T], Depth, true) when C =:= $P -> + [M#{width => 0} | reformat(T, Depth, true)]; +reformat([#{control_char := C}=M | T], Depth, Single) when C =:= $p; C =:= $w -> + [limit_depth(M, Depth) | reformat(T, Depth, Single)]; +reformat([H | T], Depth, Single) -> + [H | reformat(T, Depth, Single)]; +reformat([], _, _) -> + []. + +limit_depth(M0, unlimited) -> M0; +limit_depth(#{control_char:=C0, args:=Args}=M0, Depth) -> + C = C0 - ($a - $A), %To uppercase. + M0#{control_char := C, args := Args ++ [Depth]}. + +add_default_config(Config0) -> + Default = #{single_line => true}, + Depth = get_depth(maps:get(depth, Config0, undefined)), + maps:merge(Default, Config0#{depth => Depth}). + +get_depth(undefined) -> error_logger:get_format_depth(); +get_depth(S) -> max(5, S). + +best_effort_unicode(Input, Config) -> + try unicode:characters_to_binary(Input, utf8) of + B when is_binary(B) -> B; + _ -> do_format_msg("~p", [Input], Config) + catch + _ : _ -> + do_format_msg("~p", [Input], Config) + end. + +best_effort_json_obj(List, Config) when is_list(List) -> + try + json_obj(maps:from_list(List), Config) + catch + _ : _ -> + [json(I, Config) || I <- List] + end; +best_effort_json_obj(Map, Config) -> + try + json_obj(Map, Config) + catch + _ : _ -> + do_format_msg("~p", [Map], Config) + end. + +json([], _) -> "[]"; +json(<<"">>, _) -> "\"\""; +json(A, _) when is_atom(A) -> atom_to_binary(A, utf8); +json(I, _) when is_integer(I) -> I; +json(F, _) when is_float(F) -> F; +json(P, C) when is_pid(P) -> json(pid_to_list(P), C); +json(P, C) when is_port(P) -> json(port_to_list(P), C); +json(F, C) when is_function(F) -> json(erlang:fun_to_list(F), C); +json(B, Config) when is_binary(B) -> + best_effort_unicode(B, Config); +json(L, Config) when is_list(L), is_integer(hd(L))-> + best_effort_unicode(L, Config); +json(M, Config) when is_list(M), is_tuple(hd(M)), tuple_size(hd(M)) =:= 2 -> + best_effort_json_obj(M, Config); +json(L, Config) when is_list(L) -> + [json(I, Config) || I <- L]; +json(Map, Config) when is_map(Map) -> + best_effort_json_obj(Map, Config); +json(Term, Config) -> + do_format_msg("~p", [Term], Config). + +json_obj(Data, Config) -> + maps:fold(fun (K, V, D) -> + json_kv(K, V, D, Config) + end, maps:new(), Data). + +json_kv(mfa, {M, F, A}, Data, _Config) -> %% emqx/snabbkaffe + maps:put(mfa, <<(atom_to_binary(M, utf8))/binary, $:, + (atom_to_binary(F, utf8))/binary, $/, + (integer_to_binary(A))/binary>>, Data); +json_kv('$kind', Kind, Data, Config) -> %% snabbkaffe + maps:put(msg, json(Kind, Config), Data); +json_kv(K0, V, Data, Config) -> + K = json_key(K0), + case is_map(V) of + true -> maps:put(json(K, Config), best_effort_json_obj(V, Config), Data); + false -> maps:put(json(K, Config), json(V, Config), Data) + end. + +json_key(A) when is_atom(A) -> json_key(atom_to_binary(A, utf8)); +json_key(Term) -> + try unicode:characters_to_binary(Term, utf8) of + OK when is_binary(OK) andalso OK =/= <<>> -> + OK; + _ -> + throw({badkey, Term}) + catch + _:_ -> + throw({badkey, Term}) + end. + +-ifdef(TEST). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +no_crash_test_() -> + Opts = [{numtests, 1000}, {to_file, user}], + {timeout, 30, + fun() -> ?assert(proper:quickcheck(t_no_crash(), Opts)) end}. + +t_no_crash() -> + ?FORALL({Level, Report, Meta, Config}, + {p_level(), p_report(), p_meta(), p_config()}, + t_no_crash_run(Level, Report, Meta, Config)). + +t_no_crash_run(Level, Report, {undefined, Meta}, Config) -> + t_no_crash_run(Level, Report, maps:from_list(Meta), Config); +t_no_crash_run(Level, Report, {ReportCb, Meta}, Config) -> + t_no_crash_run(Level, Report, maps:from_list([{report_cb, ReportCb} | Meta]), Config); +t_no_crash_run(Level, Report, Meta, Config) -> + Input = #{ level => Level + , msg => {report, Report} + , meta => filter(Meta) + }, + _ = format(Input, maps:from_list(Config)), + true. + +%% assume top level Report and Meta are sane +filter(Map) -> + Keys = lists:filter( + fun(K) -> + try json_key(K), true + catch throw : {badkey, _} -> false + end + end, maps:keys(Map)), + maps:with(Keys, Map). + +p_report_cb() -> + proper_types:oneof([ fun ?MODULE:report_cb_1/1 + , fun ?MODULE:report_cb_2/2 + , fun ?MODULE:report_cb_crash/2 + , fun logger:format_otp_report/1 + , fun logger:format_report/1 + , format_report_undefined + ]). + +report_cb_1(Input) -> {"~p", [Input]}. + +report_cb_2(Input, _Config) -> io_lib:format("~p", [Input]). + +report_cb_crash(_Input, _Config) -> error(report_cb_crash). + +p_kvlist() -> + proper_types:list({ + proper_types:oneof([proper_types:atom(), + proper_types:binary() + ]), proper_types:term()}). + +%% meta type is 2-tuple, report_cb type, and some random key value pairs +p_meta() -> + {p_report_cb(), p_kvlist()}. + +p_report() -> p_kvlist(). + +p_limit() -> proper_types:oneof([proper_types:pos_integer(), unlimited]). + +p_level() -> proper_types:oneof([info, debug, error, warning, foobar]). + +p_config() -> + proper_types:shrink_list( + [ {depth, p_limit()} + , {single_line, proper_types:boolean()} + ]). + +-endif. diff --git a/src/emqx_logger_textfmt.erl b/src/emqx_logger_textfmt.erl new file mode 100644 index 000000000..3bc9f185a --- /dev/null +++ b/src/emqx_logger_textfmt.erl @@ -0,0 +1,49 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_logger_textfmt). + +-export([format/2]). +-export([check_config/1]). + +%% metadata fields which we do not wish to merge into log data +-define(WITHOUT_MERGE, + [ report_cb % just a callback + , time % formatted as a part of templated message + , peername % formatted as a part of templated message + , clientid % formatted as a part of templated message + , gl % not interesting + ]). + +check_config(X) -> logger_formatter:check_config(X). + +format(#{msg := Msg0, meta := Meta} = Event, Config) -> + Msg = maybe_merge(Msg0, Meta), + logger_formatter:format(Event#{msg := Msg}, Config). + +maybe_merge({report, Report}, Meta) when is_map(Report) -> + {report, maps:merge(rename(Report), filter(Meta))}; +maybe_merge(Report, _Meta) -> + Report. + +filter(Meta) -> + maps:without(?WITHOUT_MERGE, Meta). + +rename(#{'$kind' := Kind} = Meta0) -> % snabbkaffe + Meta = maps:remove('$kind', Meta0), + Meta#{msg => Kind}; +rename(Meta) -> + Meta. diff --git a/src/emqx_node_dump.erl b/src/emqx_node_dump.erl new file mode 100644 index 000000000..7134684e1 --- /dev/null +++ b/src/emqx_node_dump.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% Collection of functions for creating node dumps +-module(emqx_node_dump). + +-export([ sys_info/0 + , app_env_dump/0 + ]). + +sys_info() -> + #{ release => emqx_app:get_release() + , otp_version => emqx_vm:get_otp_version() + }. + +app_env_dump() -> + censor(ets:tab2list(ac_tab)). + +censor([]) -> + []; +censor([{{env, App, Key}, Val} | Rest]) -> + [{{env, App, Key}, censor([Key, App], Val)} | censor(Rest)]; +censor([_ | Rest]) -> + censor(Rest). + +censor(Path, {Key, Val}) when is_atom(Key) -> + {Key, censor([Key|Path], Val)}; +censor(Path, M) when is_map(M) -> + Fun = fun(Key, Val) -> + censor([Key|Path], Val) + end, + maps:map(Fun, M); +censor(Path, L = [Fst|_]) when is_tuple(Fst) -> + [censor(Path, I) || I <- L]; +censor(Path, Val) -> + case Path of + [password|_] -> + obfuscate_value(Val); + [secret|_] -> + obfuscate_value(Val); + _ -> + Val + end. + +obfuscate_value(Val) when is_binary(Val) -> + <<"********">>; +obfuscate_value(_Val) -> + "********". + +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +censor_test() -> + ?assertMatch( [{{env, emqx, listeners}, #{password := <<"********">>}}] + , censor([foo, {{env, emqx, listeners}, #{password => <<"secret">>}}, {app, bar}]) + ), + ?assertMatch( [{{env, emqx, listeners}, [{foo, 1}, {password, "********"}]}] + , censor([{{env, emqx, listeners}, [{foo, 1}, {password, "secret"}]}]) + ). + +-endif. %% TEST diff --git a/src/emqx_packet.erl b/src/emqx_packet.erl index 7f154b3da..a4d440ba1 100644 --- a/src/emqx_packet.erl +++ b/src/emqx_packet.erl @@ -46,6 +46,24 @@ -export([format/1]). +-define(TYPE_NAMES, + { 'CONNECT' + , 'CONNACK' + , 'PUBLISH' + , 'PUBACK' + , 'PUBREC' + , 'PUBREL' + , 'PUBCOMP' + , 'SUBSCRIBE' + , 'SUBACK' + , 'UNSUBSCRIBE' + , 'UNSUBACK' + , 'PINGREQ' + , 'PINGRESP' + , 'DISCONNECT' + , 'AUTH' + }). + -type(connect() :: #mqtt_packet_connect{}). -type(publish() :: #mqtt_packet_publish{}). -type(subscribe() :: #mqtt_packet_subscribe{}). @@ -61,9 +79,13 @@ type(#mqtt_packet{header = #mqtt_packet_header{type = Type}}) -> Type. %% @doc Name of MQTT packet type. --spec(type_name(emqx_types:packet()) -> atom()). -type_name(Packet) when is_record(Packet, mqtt_packet) -> - lists:nth(type(Packet), ?TYPE_NAMES). +-spec(type_name(emqx_types:packet() | non_neg_integer()) -> atom() | string()). +type_name(#mqtt_packet{} = Packet) -> + type_name(type(Packet)); +type_name(0) -> 'FORBIDDEN'; +type_name(Type) when Type > 0 andalso Type =< tuple_size(?TYPE_NAMES) -> + element(Type, ?TYPE_NAMES); +type_name(Type) -> "UNKNOWN("++ integer_to_list(Type) ++")". %% @doc Dup flag of MQTT packet. -spec(dup(emqx_types:packet()) -> boolean()). @@ -229,13 +251,16 @@ set_props(Props, #mqtt_packet_auth{} = Pkt) -> %% @doc Check PubSub Packet. -spec(check(emqx_types:packet()|publish()|subscribe()|unsubscribe()) -> ok | {error, emqx_types:reason_code()}). -check(#mqtt_packet{variable = PubPkt}) when is_record(PubPkt, mqtt_packet_publish) -> +check(#mqtt_packet{header = #mqtt_packet_header{type = ?PUBLISH}, + variable = PubPkt}) when not is_tuple(PubPkt) -> + %% publish without any data + %% disconnect instead of crash + {error, ?RC_PROTOCOL_ERROR}; +check(#mqtt_packet{variable = #mqtt_packet_publish{} = PubPkt}) -> check(PubPkt); - -check(#mqtt_packet{variable = SubPkt}) when is_record(SubPkt, mqtt_packet_subscribe) -> +check(#mqtt_packet{variable = #mqtt_packet_subscribe{} = SubPkt}) -> check(SubPkt); - -check(#mqtt_packet{variable = UnsubPkt}) when is_record(UnsubPkt, mqtt_packet_unsubscribe) -> +check(#mqtt_packet{variable = #mqtt_packet_unsubscribe{} = UnsubPkt}) -> check(UnsubPkt); %% A Topic Alias of 0 is not permitted. @@ -417,12 +442,11 @@ format_header(#mqtt_packet_header{type = Type, dup = Dup, qos = QoS, retain = Retain}, S) -> - S1 = if - S == undefined -> <<>>; - true -> [", ", S] + S1 = case S == undefined of + true -> <<>>; + false -> [", ", S] end, - io_lib:format("~s(Q~p, R~p, D~p~s)", - [lists:nth(Type, ?TYPE_NAMES), QoS, i(Retain), i(Dup), S1]). + io_lib:format("~s(Q~p, R~p, D~p~s)", [type_name(Type), QoS, i(Retain), i(Dup), S1]). format_variable(undefined, _) -> undefined; diff --git a/src/emqx_router.erl b/src/emqx_router.erl index 7288d2e19..75b567666 100644 --- a/src/emqx_router.erl +++ b/src/emqx_router.erl @@ -266,28 +266,38 @@ maybe_trans(Fun, Args) -> end, []) end. +%% The created fun only terminates with explicit exception +-dialyzer({nowarn_function, [trans/2]}). + -spec(trans(function(), list(any())) -> ok | {error, term()}). trans(Fun, Args) -> - %% trigger selective receive optimization of compiler, - %% ideal for handling bursty traffic. - Ref = erlang:make_ref(), - Owner = self(), - {WPid, RefMon} = spawn_monitor( - fun() -> - Res = case mnesia:transaction(Fun, Args) of - {atomic, Ok} -> Ok; - {aborted, Reason} -> {error, Reason} - end, - Owner ! {Ref, Res} - end), + {WPid, RefMon} = + spawn_monitor( + %% NOTE: this is under the assumption that crashes in Fun + %% are caught by mnesia:transaction/2. + %% Future changes should keep in mind that this process + %% always exit with database write result. + fun() -> + Res = case mnesia:transaction(Fun, Args) of + {atomic, Ok} -> Ok; + {aborted, Reason} -> {error, Reason} + end, + exit({shutdown, Res}) + end), + %% Receive a 'shutdown' exit to pass result from the short-lived process. + %% so the receive below can be receive-mark optimized by the compiler. + %% + %% If the result is sent as a regular message, we'll have to + %% either demonitor (with flush which is essentially a 'receive' since + %% the process is no longer alive after the result has been received), + %% or use a plain 'receive' to drain the normal 'DOWN' message. + %% However the compiler does not optimize this second 'receive'. receive - {Ref, TransRes} -> - receive - {'DOWN', RefMon, process, WPid, normal} -> ok - end, - TransRes; {'DOWN', RefMon, process, WPid, Info} -> - {error, {trans_crash, Info}} + case Info of + {shutdown, Result} -> Result; + _ -> {error, {trans_crash, Info}} + end end. lock_router() -> diff --git a/src/emqx_trie.erl b/src/emqx_trie.erl index e8a51b8ef..8ece333b0 100644 --- a/src/emqx_trie.erl +++ b/src/emqx_trie.erl @@ -122,7 +122,7 @@ match(Topic) when is_binary(Topic) -> %% @doc Is the trie empty? -spec(empty() -> boolean()). -empty() -> ets:info(?TRIE, size) == 0. +empty() -> ets:first(?TRIE) =:= '$end_of_table'. -spec lock_tables() -> ok. lock_tables() -> diff --git a/test/emqx_http_lib_tests.erl b/test/emqx_http_lib_tests.erl index 393d1ff86..a850da8f5 100644 --- a/test/emqx_http_lib_tests.erl +++ b/test/emqx_http_lib_tests.erl @@ -77,3 +77,8 @@ uri_parse_test_() -> end } ]. + +normalise_headers_test() -> + ?assertEqual([{"content-type", "applicaiton/binary"}], + emqx_http_lib:normalise_headers([{"Content_Type", "applicaiton/binary"}, + {"content-type", "applicaiton/json"}])). diff --git a/test/emqx_packet_SUITE.erl b/test/emqx_packet_SUITE.erl index 4c3221107..ae7b16a8a 100644 --- a/test/emqx_packet_SUITE.erl +++ b/test/emqx_packet_SUITE.erl @@ -309,3 +309,7 @@ t_format(_) -> io:format("~s", [emqx_packet:format(?UNSUBACK_PACKET(90))]), io:format("~s", [emqx_packet:format(?DISCONNECT_PACKET(128))]). +t_parse_empty_publish(_) -> + %% 52: 0011(type=PUBLISH) 0100 (QoS=2) + {ok, Packet, <<>>, {none, _}} = emqx_frame:parse(<<52, 0>>), + ?assertEqual({error, ?RC_PROTOCOL_ERROR}, emqx_packet:check(Packet)).