Merge remote-tracking branch 'origin/release-50' into 1206-chore-merge-ee50-to-release-50

This commit is contained in:
Zaiming (Stone) Shi 2022-12-06 16:35:56 +01:00
commit 1c2fc4b6c3
56 changed files with 4281 additions and 3002 deletions

View File

@ -26,29 +26,39 @@ inputs:
runs:
using: composite
steps:
- name: prepare
- id: prepare
shell: bash
run: |
brew update
brew install curl zip unzip kerl coreutils openssl@1.1
brew install curl zip unzip coreutils openssl@1.1
echo "/usr/local/opt/bison/bin" >> $GITHUB_PATH
echo "/usr/local/bin" >> $GITHUB_PATH
echo "emqx_name=${emqx_name}" >> $GITHUB_OUTPUT
OTP_SOURCE_PATH="$HOME/src/otp-${{ inputs.otp }}"
OTP_INSTALL_PATH="$HOME/otp/${{ inputs.otp }}"
echo "OTP_SOURCE_PATH=$OTP_SOURCE_PATH" >> $GITHUB_OUTPUT
echo "OTP_INSTALL_PATH=$OTP_INSTALL_PATH" >> $GITHUB_OUTPUT
mkdir -p "$OTP_SOURCE_PATH" "$OTP_INSTALL_PATH"
- uses: actions/cache@v3
id: cache
with:
path: ~/.kerl/${{ inputs.otp }}
path: ${{ steps.prepare.outputs.OTP_INSTALL_PATH }}
key: otp-install-${{ inputs.otp }}-${{ inputs.os }}-static-ssl-disable-hipe-disable-jit
- name: build erlang
if: steps.cache.outputs.cache-hit != 'true'
shell: bash
env:
KERL_BUILD_BACKEND: git
OTP_GITHUB_URL: https://github.com/emqx/otp
KERL_CONFIGURE_OPTIONS: --disable-dynamic-ssl-lib --with-ssl=/usr/local/opt/openssl@1.1 --disable-hipe --disable-jit
run: |
kerl update releases
kerl build ${{ inputs.otp }}
kerl install ${{ inputs.otp }} $HOME/.kerl/${{ inputs.otp }}
OTP_SOURCE_PATH="${{ steps.prepare.outputs.OTP_SOURCE_PATH }}"
OTP_INSTALL_PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}"
if [ -d "$OTP_SOURCE_PATH" ]; then
rm -rf "$OTP_SOURCE_PATH"
fi
git clone --depth 1 --branch OTP-${{ inputs.otp }} https://github.com/emqx/otp.git "$OTP_SOURCE_PATH"
cd "$OTP_SOURCE_PATH"
./configure --disable-dynamic-ssl-lib --with-ssl=$(brew --prefix openssl@1.1) --disable-hipe --disable-jit --prefix="$OTP_INSTALL_PATH"
make -j$(nproc)
rm -rf "$OTP_INSTALL_PATH"
make install
- name: build ${{ inputs.profile }}
env:
AUTO_INSTALL_BUILD_DEPS: 1
@ -61,13 +71,16 @@ runs:
APPLE_DEVELOPER_ID_BUNDLE_PASSWORD: ${{ inputs.apple_developer_id_bundle_password }}
shell: bash
run: |
. $HOME/.kerl/${{ inputs.otp }}/activate
export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH"
make ensure-rebar3
sudo cp rebar3 /usr/local/bin/rebar3
mkdir -p $HOME/bin
cp rebar3 $HOME/bin/rebar3
export PATH="$HOME/bin:$PATH"
make ${{ inputs.profile }}-tgz
- name: test ${{ inputs.profile }}
shell: bash
run: |
export PATH="${{ steps.prepare.outputs.OTP_INSTALL_PATH }}/bin:$PATH"
pkg_name=$(find _packages/${{ inputs.profile }} -mindepth 1 -maxdepth 1 -iname \*.zip)
mkdir emqx
unzip -d emqx $pkg_name > /dev/null

View File

@ -74,7 +74,7 @@ jobs:
esac
;;
esac
echo "::set-output name=BUILD_PROFILE::$PROFILE"
echo "BUILD_PROFILE=$PROFILE" >> $GITHUB_OUTPUT
- name: get_all_deps
run: |
make -C source deps-all
@ -142,8 +142,10 @@ jobs:
- 24.3.4.2-1
os:
- macos-11
- macos-12-arm64
runs-on: ${{ matrix.os }}
steps:
- uses: emqx/self-hosted-cleanup-action@v1.0.3
- uses: actions/download-artifact@v3
with:
name: source
@ -151,11 +153,8 @@ jobs:
- name: unzip source code
run: |
ln -s . source
unzip -q source.zip
unzip -o -q source.zip
rm source source.zip
- name: prepare
run: |
git config --global credential.helper store
- uses: ./.github/actions/package-macos
with:
profile: ${{ matrix.profile }}

View File

@ -135,6 +135,7 @@ jobs:
- 24.3.4.2-1
os:
- macos-11
- macos-12-arm64
runs-on: ${{ matrix.os }}
@ -143,7 +144,6 @@ jobs:
- name: prepare
run: |
echo "EMQX_NAME=${{ matrix.profile }}" >> $GITHUB_ENV
echo "BUILD_WITH_QUIC=1" >> $GITHUB_ENV
- uses: ./.github/actions/package-macos
with:
profile: ${{ matrix.profile }}
@ -155,7 +155,7 @@ jobs:
apple_developer_id_bundle_password: ${{ secrets.APPLE_DEVELOPER_ID_BUNDLE_PASSWORD }}
- uses: actions/upload-artifact@v3
with:
name: macos
name: ${{ matrix.os }}
path: _packages/**/*
spellcheck:

View File

@ -1,45 +0,0 @@
name: Sync to enterprise
on:
schedule:
- cron: '0 */6 * * *'
push:
branches:
- main-v*
jobs:
sync_to_enterprise:
runs-on: ubuntu-20.04
if: github.repository == 'emqx/emqx'
steps:
- name: git-sync
uses: Rory-Z/git-sync@v3.0.1
with:
source_repo: ${{ github.repository }}
source_branch: ${{ github.ref }}
destination_repo: "${{ github.repository_owner }}/emqx-enterprise"
destination_branch: ${{ github.ref }}
destination_ssh_private_key: "${{ secrets.CI_SSH_PRIVATE_KEY }}"
- name: create pull request
id: create_pull_request
run: |
set -euo pipefail
EE_REF="${GITHUB_REF}-enterprise"
R=$(curl --silent --show-error \
-H "Accept: application/vnd.github.v3+json" \
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
-X POST \
-d "{\"title\": \"Sync code from opensource $GITHUB_REF to entperprise $EE_REF\", \"head\": \"$GITHUB_REF\", \"base\":\"$EE_REF\"}" \
https://api.github.com/repos/${{ github.repository_owner }}/emqx-enterprise/pulls)
echo $R | jq
echo "::set-output name=url::$(echo $R | jq '.url')"
- name: request reviewers for a pull request
if: steps.create_pull_request.outputs.url != 'null'
run: |
set -euo pipefail
curl --silent --show-error \
-H "Accept: application/vnd.github.v3+json" \
-H "Authorization: token ${{ secrets.CI_GIT_TOKEN }}" \
-X POST \
-d '{"team_reviewers":["emqx-devs"]}' \
${{ steps.create_pull_request.outputs.url }}/requested_reviewers

View File

@ -90,12 +90,3 @@ jobs:
-d "{\"ref\":\"v1.0.4\",\"inputs\":{\"version\": \"${{ github.ref_name }}\"}}" \
"https://api.github.com/repos/emqx/emqx-ci-helper/actions/workflows/update_emqx_homebrew.yaml/dispatches"
fi
- uses: emqx/push-helm-action@v1
if: github.event_name == 'release' || inputs.publish_release_artefacts
with:
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }}
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws_region: "us-west-2"
aws_bucket_name: "repos-emqx-io"

View File

@ -132,8 +132,6 @@ jobs:
fail-fast: false
matrix:
app: ${{ fromJson(needs.prepare.outputs.docker_ct_apps) }}
otp_release:
- "erlang24"
runs-on: aws-amd64
defaults:
@ -169,7 +167,7 @@ jobs:
- uses: actions/upload-artifact@v3
if: failure()
with:
name: logs_${{ matrix.otp_release }}-${{ matrix.app[0] }}-${{ matrix.app[1] }}
name: logs-${{ matrix.profile }}-${{ matrix.app[0] }}-${{ matrix.app[1] }}
path: source/_build/test/logs
ct:
@ -211,7 +209,7 @@ jobs:
- uses: actions/upload-artifact@v3
if: failure()
with:
name: logs_${{ matrix.otp_release }}-${{ matrix.app[0] }}-${{ matrix.app[1] }}
name: logs-${{ matrix.profile }}-${{ matrix.app[0] }}-${{ matrix.app[1] }}
path: source/_build/test/logs
make_cover:

View File

@ -0,0 +1,52 @@
name: Upload helm charts
on:
release:
types:
- published
workflow_dispatch:
inputs:
tag:
type: string
required: true
jobs:
upload:
runs-on: ubuntu-20.04
strategy:
fail-fast: false
steps:
- uses: aws-actions/configure-aws-credentials@v1-node16
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ secrets.AWS_DEFAULT_REGION }}
- uses: actions/checkout@v3
with:
ref: ${{ github.event.inputs.tag }}
- name: Detect profile
id: profile
run: |
if git describe --tags --match '[v|e]*' --exact; then
REF=$(git describe --tags --match '[v|e]*' --exact)
else
echo "Only release tags matching '[v|e]*' are supported"
exit 1
fi
case "$REF" in
v*)
echo "profile=emqx" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx)" >> $GITHUB_OUTPUT
;;
e*)
echo "profile=emqx-enterprise" >> $GITHUB_OUTPUT
echo "version=$(./pkg-vsn.sh emqx-enterprise)" >> $GITHUB_OUTPUT
;;
esac
- uses: emqx/push-helm-action@v1
with:
charts_dir: "${{ github.workspace }}/deploy/charts/${{ steps.profile.outputs.profile }}"
version: ${{ steps.profile.outputs.version }}
aws_access_key_id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws_secret_access_key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws_region: "us-west-2"
aws_bucket_name: "repos-emqx-io"

View File

@ -40,15 +40,12 @@
-define(ERTS_MINIMUM_REQUIRED, "10.0").
%%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $queue | $share
%% Topics' prefix: $SYS | $share
%%--------------------------------------------------------------------
%% System topic
-define(SYSTOP, <<"$SYS/">>).
%% Queue topic
-define(QUEUE, <<"$queue/">>).
%%--------------------------------------------------------------------
%% alarms
%%--------------------------------------------------------------------

View File

@ -601,8 +601,7 @@ handle_msg({event, disconnected}, State = #state{channel = Channel}) ->
{ok, State};
handle_msg({event, _Other}, State = #state{channel = Channel}) ->
ClientId = emqx_channel:info(clientid, Channel),
emqx_cm:set_chan_info(ClientId, info(State)),
emqx_cm:set_chan_stats(ClientId, stats(State)),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State)),
{ok, State};
handle_msg({timeout, TRef, TMsg}, State) ->
handle_timeout(TRef, TMsg, State);

View File

@ -103,7 +103,13 @@ cast(Msg) -> gen_server:cast(?SERVER, Msg).
run_command([]) ->
run_command(help, []);
run_command([Cmd | Args]) ->
run_command(list_to_atom(Cmd), Args).
case emqx_misc:safe_to_existing_atom(Cmd) of
{ok, Cmd1} ->
run_command(Cmd1, Args);
_ ->
help(),
{error, cmd_not_found}
end.
-spec run_command(cmd(), list(string())) -> ok | {error, term()}.
run_command(help, []) ->
@ -220,12 +226,13 @@ init([]) ->
handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq}) ->
case ets:match(?CMD_TAB, {{'$1', Cmd}, '_', '_'}) of
[] ->
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts});
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
{reply, ok, next_seq(State)};
[[OriginSeq] | _] ->
?SLOG(warning, #{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts})
end,
{reply, ok, next_seq(State)};
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}),
{reply, ok, State}
end;
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", call => Req}),
{reply, ignored, State}.

View File

@ -182,8 +182,19 @@ parse_remaining_len(
Packet = packet(Header, #mqtt_packet_disconnect{reason_code = ?RC_SUCCESS}),
{ok, Packet, Rest, ?NONE(Options)};
%% Match PINGREQ.
parse_remaining_len(<<0:8, Rest/binary>>, Header, 1, 0, Options) ->
parse_remaining_len(
<<0:8, Rest/binary>>, Header = #mqtt_packet_header{type = ?PINGREQ}, 1, 0, Options
) ->
parse_frame(Rest, Header, 0, Options);
parse_remaining_len(
<<0:8, _Rest/binary>>, _Header = #mqtt_packet_header{type = ?PINGRESP}, 1, 0, _Options
) ->
?PARSE_ERR(#{hint => unexpected_packet, header_type => 'PINGRESP'});
%% All other types of messages should not have a zero remaining length.
parse_remaining_len(
<<0:8, _Rest/binary>>, Header, 1, 0, _Options
) ->
?PARSE_ERR(#{hint => zero_remaining_len, header_type => Header#mqtt_packet_header.type});
%% Match PUBACK, PUBREC, PUBREL, PUBCOMP, UNSUBACK...
parse_remaining_len(<<0:1, 2:7, Rest/binary>>, Header, 1, 0, Options) ->
parse_frame(Rest, Header, 2, Options);
@ -255,20 +266,33 @@ packet(Header, Variable) ->
packet(Header, Variable, Payload) ->
#mqtt_packet{header = Header, variable = Variable, payload = Payload}.
parse_packet(
#mqtt_packet_header{type = ?CONNECT},
FrameBin,
#{strict_mode := StrictMode}
) ->
{ProtoName, Rest} = parse_utf8_string(FrameBin, StrictMode),
<<BridgeTag:4, ProtoVer:4, Rest1/binary>> = Rest,
% Note: Crash when reserved flag doesn't equal to 0, there is no strict
% compliance with the MQTT5.0.
<<UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2, WillFlag:1, CleanStart:1, 0:1,
KeepAlive:16/big, Rest2/binary>> = Rest1,
parse_connect(FrameBin, StrictMode) ->
{ProtoName, Rest} = parse_utf8_string_with_hint(FrameBin, StrictMode, invalid_proto_name),
case ProtoName of
<<"MQTT">> ->
ok;
<<"MQIsdp">> ->
ok;
_ ->
%% from spec: the server MAY send disconnect with reason code 0x84
%% we chose to close socket because the client is likely not talking MQTT anyway
?PARSE_ERR(#{
hint => invalid_proto_name,
expected => <<"'MQTT' or 'MQIsdp'">>,
received => ProtoName
})
end,
parse_connect2(ProtoName, Rest, StrictMode).
% Note: return malformed if reserved flag is not 0.
parse_connect2(
ProtoName,
<<BridgeTag:4, ProtoVer:4, UsernameFlag:1, PasswordFlag:1, WillRetain:1, WillQoS:2, WillFlag:1,
CleanStart:1, 0:1, KeepAlive:16/big, Rest2/binary>>,
StrictMode
) ->
{Properties, Rest3} = parse_properties(Rest2, ProtoVer, StrictMode),
{ClientId, Rest4} = parse_utf8_string(Rest3, StrictMode),
{ClientId, Rest4} = parse_utf8_string_with_hint(Rest3, StrictMode, invalid_username),
ConnPacket = #mqtt_packet_connect{
proto_name = ProtoName,
proto_ver = ProtoVer,
@ -282,26 +306,56 @@ parse_packet(
clientid = ClientId
},
{ConnPacket1, Rest5} = parse_will_message(ConnPacket, Rest4, StrictMode),
{Username, Rest6} = parse_utf8_string(Rest5, StrictMode, bool(UsernameFlag)),
{Password, <<>>} = parse_utf8_string(Rest6, StrictMode, bool(PasswordFlag)),
ConnPacket1#mqtt_packet_connect{username = Username, password = Password};
{Username, Rest6} = parse_optional(
Rest5,
fun(Bin) ->
parse_utf8_string_with_hint(Bin, StrictMode, invalid_username)
end,
bool(UsernameFlag)
),
{Password, Rest7} = parse_optional(
Rest6,
fun(Bin) ->
parse_utf8_string_with_hint(Bin, StrictMode, invalid_password)
end,
bool(PasswordFlag)
),
case Rest7 of
<<>> ->
ConnPacket1#mqtt_packet_connect{username = Username, password = Password};
_ ->
?PARSE_ERR(malformed_connect_payload)
end;
parse_connect2(_ProtoName, _, _) ->
?PARSE_ERR(malformed_connect_header).
parse_packet(
#mqtt_packet_header{type = ?CONNECT},
FrameBin,
#{strict_mode := StrictMode}
) ->
parse_connect(FrameBin, StrictMode);
parse_packet(
#mqtt_packet_header{type = ?CONNACK},
<<AckFlags:8, ReasonCode:8, Rest/binary>>,
#{version := Ver, strict_mode := StrictMode}
) ->
{Properties, <<>>} = parse_properties(Rest, Ver, StrictMode),
#mqtt_packet_connack{
ack_flags = AckFlags,
reason_code = ReasonCode,
properties = Properties
};
case parse_properties(Rest, Ver, StrictMode) of
{Properties, <<>>} ->
#mqtt_packet_connack{
ack_flags = AckFlags,
reason_code = ReasonCode,
properties = Properties
};
_ ->
?PARSE_ERR(malformed_properties)
end;
parse_packet(
#mqtt_packet_header{type = ?PUBLISH, qos = QoS},
Bin,
#{strict_mode := StrictMode, version := Ver}
) ->
{TopicName, Rest} = parse_utf8_string(Bin, StrictMode),
{TopicName, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_topic),
{PacketId, Rest1} =
case QoS of
?QOS_0 -> {undefined, Rest};
@ -411,7 +465,9 @@ parse_packet(
#{strict_mode := StrictMode, version := ?MQTT_PROTO_V5}
) ->
{Properties, <<>>} = parse_properties(Rest, ?MQTT_PROTO_V5, StrictMode),
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties}.
#mqtt_packet_auth{reason_code = ReasonCode, properties = Properties};
parse_packet(_Header, _FrameBin, _Options) ->
?PARSE_ERR(malformed_packet).
parse_will_message(
Packet = #mqtt_packet_connect{
@ -422,7 +478,7 @@ parse_will_message(
StrictMode
) ->
{Props, Rest} = parse_properties(Bin, Ver, StrictMode),
{Topic, Rest1} = parse_utf8_string(Rest, StrictMode),
{Topic, Rest1} = parse_utf8_string_with_hint(Rest, StrictMode, invalid_topic),
{Payload, Rest2} = parse_binary_data(Rest1),
{
Packet#mqtt_packet_connect{
@ -437,7 +493,9 @@ parse_will_message(Packet, Bin, _StrictMode) ->
-compile({inline, [parse_packet_id/1]}).
parse_packet_id(<<PacketId:16/big, Rest/binary>>) ->
{PacketId, Rest}.
{PacketId, Rest};
parse_packet_id(_) ->
?PARSE_ERR(invalid_packet_id).
parse_properties(Bin, Ver, _StrictMode) when Ver =/= ?MQTT_PROTO_V5 ->
{#{}, Bin};
@ -458,10 +516,10 @@ parse_property(<<16#01, Val, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#02, Val:32/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Message-Expiry-Interval' => Val}, StrictMode);
parse_property(<<16#03, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_content_type),
parse_property(Rest, Props#{'Content-Type' => Val}, StrictMode);
parse_property(<<16#08, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_topic),
parse_property(Rest, Props#{'Response-Topic' => Val}, StrictMode);
parse_property(<<16#09, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Correlation-Data' => Val}, StrictMode);
@ -471,12 +529,12 @@ parse_property(<<16#0B, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#11, Val:32/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Session-Expiry-Interval' => Val}, StrictMode);
parse_property(<<16#12, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_assigned_client_id),
parse_property(Rest, Props#{'Assigned-Client-Identifier' => Val}, StrictMode);
parse_property(<<16#13, Val:16, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Server-Keep-Alive' => Val}, StrictMode);
parse_property(<<16#15, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_authn_method),
parse_property(Rest, Props#{'Authentication-Method' => Val}, StrictMode);
parse_property(<<16#16, Len:16/big, Val:Len/binary, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Authentication-Data' => Val}, StrictMode);
@ -487,13 +545,13 @@ parse_property(<<16#18, Val:32, Bin/binary>>, Props, StrictMode) ->
parse_property(<<16#19, Val, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Request-Response-Information' => Val}, StrictMode);
parse_property(<<16#1A, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_response_info),
parse_property(Rest, Props#{'Response-Information' => Val}, StrictMode);
parse_property(<<16#1C, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_server_reference),
parse_property(Rest, Props#{'Server-Reference' => Val}, StrictMode);
parse_property(<<16#1F, Bin/binary>>, Props, StrictMode) ->
{Val, Rest} = parse_utf8_string(Bin, StrictMode),
{Val, Rest} = parse_utf8_string_with_hint(Bin, StrictMode, invalid_reason_string),
parse_property(Rest, Props#{'Reason-String' => Val}, StrictMode);
parse_property(<<16#21, Val:16/big, Bin/binary>>, Props, StrictMode) ->
parse_property(Bin, Props#{'Receive-Maximum' => Val}, StrictMode);
@ -584,10 +642,18 @@ parse_utf8_pair(Bin, _StrictMode) when
total_bytes => byte_size(Bin)
}).
parse_utf8_string(Bin, _StrictMode, false) ->
{undefined, Bin};
parse_utf8_string(Bin, StrictMode, true) ->
parse_utf8_string(Bin, StrictMode).
parse_utf8_string_with_hint(Bin, StrictMode, Hint) ->
try
parse_utf8_string(Bin, StrictMode)
catch
throw:{?FRAME_PARSE_ERROR, Reason} when is_map(Reason) ->
?PARSE_ERR(Reason#{hint => Hint})
end.
parse_optional(Bin, F, true) ->
F(Bin);
parse_optional(Bin, _F, false) ->
{undefined, Bin}.
parse_utf8_string(<<Len:16/big, Str:Len/binary, Rest/binary>>, true) ->
{validate_utf8(Str), Rest};
@ -604,7 +670,7 @@ parse_utf8_string(<<Len:16/big, Rest/binary>>, _) when
parse_utf8_string(Bin, _) when
2 > byte_size(Bin)
->
?PARSE_ERR(malformed_utf8_string_length).
?PARSE_ERR(#{reason => malformed_utf8_string_length}).
parse_binary_data(<<Len:16/big, Data:Len/binary, Rest/binary>>) ->
{Data, Rest};

View File

@ -210,12 +210,8 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options).
-spec parse(topic(), map()) -> {topic(), map()}.
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of
[_Any] ->

View File

@ -378,10 +378,14 @@ get_peer_info(Type, Listener, Req, Opts) ->
of
#{src_address := SrcAddr, src_port := SrcPort, ssl := SSL} ->
SourceName = {SrcAddr, SrcPort},
%% Notice: Only CN is available in Proxy Protocol V2 additional info
%% Notice: CN is only available in Proxy Protocol V2 additional info.
%% `CN` is unsupported in Proxy Protocol V1
%% `pp2_ssl_cn` is required by config `peer_cert_as_username` or `peer_cert_as_clientid`.
%% It will be parsed by esockd.
%% See also `emqx_channel:set_peercert_infos/3` and `esockd_peercert:common_name/1`
SourceSSL =
case maps:get(cn, SSL, undefined) of
undeined -> nossl;
undefined -> undefined;
CN -> [{pp2_ssl_cn, CN}]
end,
{SourceName, SourceSSL};

View File

@ -576,7 +576,12 @@ t_serialize_parse_pingreq(_) ->
t_serialize_parse_pingresp(_) ->
PingResp = ?PACKET(?PINGRESP),
?assertEqual(PingResp, parse_serialize(PingResp)).
Packet = serialize_to_binary(PingResp),
?assertException(
throw,
{frame_parse_error, #{hint := unexpected_packet, header_type := 'PINGRESP'}},
emqx_frame:parse(Packet)
).
t_parse_disconnect(_) ->
Packet = ?DISCONNECT_PACKET(?RC_SUCCESS),
@ -619,6 +624,32 @@ t_serialize_parse_auth_v5(_) ->
})
).
t_parse_invalid_remaining_len(_) ->
?assertException(
throw, {frame_parse_error, #{hint := zero_remaining_len}}, emqx_frame:parse(<<?CONNECT, 0>>)
).
t_parse_malformed_properties(_) ->
?assertException(
throw,
{frame_parse_error, malformed_properties},
emqx_frame:parse(<<2:4, 0:4, 3:8, 1:8, 0:8, 0:8>>)
).
t_parse_malformed_connect(_) ->
?assertException(
throw,
{frame_parse_error, malformed_connect_header},
emqx_frame:parse(<<16, 11, 0, 6, 77, 81, 73, 115, 110, 112, 3, 130, 1, 6>>)
),
?assertException(
throw,
{frame_parse_error, malformed_connect_payload},
emqx_frame:parse(
<<16, 21, 0, 6, 77, 81, 73, 115, 110, 112, 3, 130, 1, 6, 0, 0, 2, 67, 49.49>>
)
).
parse_serialize(Packet) ->
parse_serialize(Packet, #{strict_mode => true}).

View File

@ -212,7 +212,9 @@ t_check_publish(_) ->
?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Topic-Alias' => 0}, <<"payload">>)
),
%% TODO::
%% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)),
%% {error, ?RC_PROTOCOL_ERROR} = emqx_packet:check(
%% ?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)
%%),
ok = emqx_packet:check(
?PUBLISH_PACKET(1, <<"topic">>, 1, #{'Subscription-Identifier' => 10}, <<"payload">>)
),
@ -414,5 +416,5 @@ t_format(_) ->
t_parse_empty_publish(_) ->
%% 52: 0011(type=PUBLISH) 0100 (QoS=2)
{ok, Packet, <<>>, {none, _}} = emqx_frame:parse(<<52, 0>>),
Packet = #mqtt_packet_publish{topic_name = <<>>},
?assertEqual({error, ?RC_PROTOCOL_ERROR}, emqx_packet:check(Packet)).

View File

@ -424,7 +424,7 @@ systopic_mon() ->
sharetopic() ->
?LET(
{Type, Grp, T},
{oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
{oneof([<<"$share">>]), list(latin_char()), normal_topic()},
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
).

View File

@ -186,10 +186,6 @@ t_systop(_) ->
?assertEqual(SysTop2, systop(<<"abc">>)).
t_feed_var(_) ->
?assertEqual(
<<"$queue/client/clientId">>,
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
),
?assertEqual(
<<"username/test/client/x">>,
feed_var(
@ -211,10 +207,6 @@ long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
t_parse(_) ->
?assertError(
{invalid_topic_filter, <<"$queue/t">>},
parse(<<"$queue/t">>, #{share => <<"g">>})
),
?assertError(
{invalid_topic_filter, <<"$share/g/t">>},
parse(<<"$share/g/t">>, #{share => <<"g">>})
@ -229,11 +221,9 @@ t_parse(_) ->
),
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
%% The '$local' and '$fastlane' topics have been deprecated.
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).

View File

@ -535,7 +535,7 @@ t_parse_incoming(_) ->
t_parse_incoming_frame_error(_) ->
{Packets, _St} = ?ws_conn:parse_incoming(<<3, 2, 1, 0>>, [], st()),
FrameError = {frame_error, function_clause},
FrameError = {frame_error, malformed_packet},
[{incoming, FrameError}] = Packets.
t_handle_incomming_frame_error(_) ->

View File

@ -84,14 +84,14 @@ emqx_authn_api {
}
}
authentication_id_move_post {
authentication_id_position_put {
desc {
en: """Move authenticator in global authentication chain."""
zh: """更改全局认证链上指定认证器的顺序。"""
}
}
listeners_listener_id_authentication_id_move_post {
listeners_listener_id_authentication_id_position_put {
desc {
en: """Move authenticator in listener authentication chain."""
zh: """更改监听器认证链上指定认证器的顺序。"""
@ -182,7 +182,6 @@ emqx_authn_api {
}
}
param_user_id {
desc {
en: """User ID."""
@ -190,6 +189,13 @@ emqx_authn_api {
}
}
param_position {
desc {
en: """Position of authenticator in chain. Possible values are 'front', 'rear', 'before:{other_authenticator}', 'after:{other_authenticator}'."""
zn: """认证者在链中的位置。可能的值是 'front', 'rear', 'before:{other_authenticator}', 'after:{other_authenticator}'"""
}
}
like_user_id {
desc {
en: """Fuzzy search user_id (username or clientid)."""

View File

@ -55,8 +55,8 @@
listener_authenticators/2,
listener_authenticator/2,
listener_authenticator_status/2,
authenticator_move/2,
listener_authenticator_move/2,
authenticator_position/2,
listener_authenticator_position/2,
authenticator_users/2,
authenticator_user/2,
listener_authenticator_users/2,
@ -67,7 +67,6 @@
-export([
authenticator_examples/0,
request_move_examples/0,
request_user_create_examples/0,
request_user_update_examples/0,
response_user_examples/0,
@ -99,14 +98,14 @@ paths() ->
"/authentication",
"/authentication/:id",
"/authentication/:id/status",
"/authentication/:id/move",
"/authentication/:id/position/:position",
"/authentication/:id/users",
"/authentication/:id/users/:user_id",
"/listeners/:listener_id/authentication",
"/listeners/:listener_id/authentication/:id",
"/listeners/:listener_id/authentication/:id/status",
"/listeners/:listener_id/authentication/:id/move",
"/listeners/:listener_id/authentication/:id/position/:position",
"/listeners/:listener_id/authentication/:id/users",
"/listeners/:listener_id/authentication/:id/users/:user_id"
].
@ -115,7 +114,6 @@ roots() ->
[
request_user_create,
request_user_update,
request_move,
response_user,
response_users
].
@ -130,8 +128,6 @@ fields(request_user_update) ->
{password, mk(binary(), #{required => true})},
{is_superuser, mk(boolean(), #{default => false, required => false})}
];
fields(request_move) ->
[{position, mk(binary(), #{required => true})}];
fields(response_user) ->
[
{user_id, mk(binary(), #{required => true})},
@ -321,17 +317,13 @@ schema("/listeners/:listener_id/authentication/:id/status") ->
}
}
};
schema("/authentication/:id/move") ->
schema("/authentication/:id/position/:position") ->
#{
'operationId' => authenticator_move,
post => #{
'operationId' => authenticator_position,
put => #{
tags => ?API_TAGS_GLOBAL,
description => ?DESC(authentication_id_move_post),
parameters => [param_auth_id()],
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(request_move),
request_move_examples()
),
description => ?DESC(authentication_id_position_put),
parameters => [param_auth_id(), param_position()],
responses => #{
204 => <<"Authenticator moved">>,
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
@ -339,17 +331,13 @@ schema("/authentication/:id/move") ->
}
}
};
schema("/listeners/:listener_id/authentication/:id/move") ->
schema("/listeners/:listener_id/authentication/:id/position/:position") ->
#{
'operationId' => listener_authenticator_move,
post => #{
'operationId' => listener_authenticator_position,
put => #{
tags => ?API_TAGS_SINGLE,
description => ?DESC(listeners_listener_id_authentication_id_move_post),
parameters => [param_listener_id(), param_auth_id()],
'requestBody' => emqx_dashboard_swagger:schema_with_examples(
ref(request_move),
request_move_examples()
),
description => ?DESC(listeners_listener_id_authentication_id_position_put),
parameters => [param_listener_id(), param_auth_id(), param_position()],
responses => #{
204 => <<"Authenticator moved">>,
400 => error_codes([?BAD_REQUEST], <<"Bad Request">>),
@ -556,6 +544,17 @@ param_listener_id() ->
})
}.
param_position() ->
{
position,
mk(binary(), #{
in => path,
desc => ?DESC(param_position),
required => true,
example => "before:password_based:built_in_database"
})
}.
param_user_id() ->
{
user_id,
@ -662,23 +661,15 @@ listener_authenticator_status(
end
).
authenticator_move(
post,
#{
bindings := #{id := AuthenticatorID},
body := #{<<"position">> := Position}
}
authenticator_position(
put,
#{bindings := #{id := AuthenticatorID, position := Position}}
) ->
move_authenticator([authentication], ?GLOBAL, AuthenticatorID, Position);
authenticator_move(post, #{bindings := #{id := _}, body := _}) ->
serialize_error({missing_parameter, position}).
move_authenticator([authentication], ?GLOBAL, AuthenticatorID, Position).
listener_authenticator_move(
post,
#{
bindings := #{listener_id := ListenerID, id := AuthenticatorID},
body := #{<<"position">> := Position}
}
listener_authenticator_position(
put,
#{bindings := #{listener_id := ListenerID, id := AuthenticatorID, position := Position}}
) ->
with_listener(
ListenerID,
@ -690,9 +681,7 @@ listener_authenticator_move(
Position
)
end
);
listener_authenticator_move(post, #{bindings := #{listener_id := _, id := _}, body := _}) ->
serialize_error({missing_parameter, position}).
).
authenticator_users(post, #{bindings := #{id := AuthenticatorID}, body := UserInfo}) ->
add_user(?GLOBAL, AuthenticatorID, UserInfo);
@ -1475,28 +1464,6 @@ request_user_update_examples() ->
}
}.
request_move_examples() ->
#{
move_to_front => #{
summary => <<"Move authenticator to the beginning of the chain">>,
value => #{
position => <<"front">>
}
},
move_to_rear => #{
summary => <<"Move authenticator to the end of the chain">>,
value => #{
position => <<"rear">>
}
},
'move_before_password_based:built_in_database' => #{
summary => <<"Move authenticator to the position preceding some other authenticator">>,
value => #{
position => <<"before:password_based:built_in_database">>
}
}
}.
response_user_examples() ->
#{
regular_user => #{

View File

@ -120,8 +120,8 @@ t_authenticator_users(_) ->
t_authenticator_user(_) ->
test_authenticator_user([]).
t_authenticator_move(_) ->
test_authenticator_move([]).
t_authenticator_position(_) ->
test_authenticator_position([]).
t_authenticator_import_users(_) ->
test_authenticator_import_users([]).
@ -138,8 +138,8 @@ t_listener_authenticator_users(_) ->
t_listener_authenticator_user(_) ->
test_authenticator_user(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_move(_) ->
test_authenticator_move(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_position(_) ->
test_authenticator_position(["listeners", ?TCP_DEFAULT]).
t_listener_authenticator_import_users(_) ->
test_authenticator_import_users(["listeners", ?TCP_DEFAULT]).
@ -539,7 +539,7 @@ test_authenticator_user(PathPrefix) ->
{ok, 404, _} = request(delete, UsersUri ++ "/u123"),
{ok, 204, _} = request(delete, UsersUri ++ "/u1").
test_authenticator_move(PathPrefix) ->
test_authenticator_position(PathPrefix) ->
AuthenticatorConfs = [
emqx_authn_test_lib:http_example(),
emqx_authn_test_lib:jwt_example(),
@ -569,42 +569,31 @@ test_authenticator_move(PathPrefix) ->
%% Invalid moves
{ok, 400, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"up">>}
),
{ok, 400, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "up"])
),
{ok, 404, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"before:invalid">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position"])
),
{ok, 404, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"before:password_based:redis">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "before:invalid"])
),
{ok, 404, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"before:password_based:redis">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "before:password_based:redis"])
),
%% Valid moves
%% test front
{ok, 204, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"front">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "front"])
),
?assertAuthenticatorsMatch(
@ -618,9 +607,8 @@ test_authenticator_move(PathPrefix) ->
%% test rear
{ok, 204, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"rear">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "rear"])
),
?assertAuthenticatorsMatch(
@ -634,9 +622,8 @@ test_authenticator_move(PathPrefix) ->
%% test before
{ok, 204, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "jwt", "move"]),
#{position => <<"before:password_based:built_in_database">>}
put,
uri(PathPrefix ++ [?CONF_NS, "jwt", "position", "before:password_based:built_in_database"])
),
?assertAuthenticatorsMatch(
@ -650,9 +637,16 @@ test_authenticator_move(PathPrefix) ->
%% test after
{ok, 204, _} = request(
post,
uri(PathPrefix ++ [?CONF_NS, "password_based%3Abuilt_in_database", "move"]),
#{position => <<"after:password_based:http">>}
put,
uri(
PathPrefix ++
[
?CONF_NS,
"password_based%3Abuilt_in_database",
"position",
"after:password_based:http"
]
)
),
?assertAuthenticatorsMatch(

View File

@ -471,8 +471,8 @@ significant: later configuration files override the previous ones.
node_global_gc_interval {
desc {
en: """Periodic garbage collection interval."""
zh: """系统调优参数,设置节点运行多久强制进行一次全局垃圾回收。"""
en: """Periodic garbage collection interval. Set to <code>disabled</code> to have it disabled."""
zh: """系统调优参数,设置节点运行多久强制进行一次全局垃圾回收。禁用设置为 <code>disabled</code>。"""
}
label {
en: "Global GC Interval"

View File

@ -467,7 +467,7 @@ fields("node") ->
)},
{"global_gc_interval",
sc(
emqx_schema:duration(),
hoconsc:union([disabled, emqx_schema:duration()]),
#{
mapping => "emqx_machine.global_gc_interval",
default => "15m",

View File

@ -29,4 +29,8 @@ t_run_gc(_) ->
ok = timer:sleep(1500),
{ok, MilliSecs} = emqx_global_gc:run(),
ct:print("Global GC: ~w(ms)~n", [MilliSecs]),
emqx_global_gc:stop().
emqx_global_gc:stop(),
ok = emqx_config:put([node, global_gc_interval], disabled),
{ok, Pid} = emqx_global_gc:start_link(),
?assertMatch(#{timer := undefined}, sys:get_state(Pid)),
ok.

View File

@ -76,7 +76,7 @@ normalize_key_to_bin(undefined) ->
normalize_key_to_bin(Map) when is_map(Map) ->
emqx_map_lib:binary_key_map(Map).
try_map_get(_Key, undefined, Default) ->
Default;
try_map_get(Key, Map, Default) when is_map(Map) ->
maps:get(Key, Map, Default).
maps:get(Key, Map, Default);
try_map_get(_Key, undefined, Default) ->
Default.

View File

@ -19,6 +19,7 @@ rebar3.crashdump
*~
rebar.lock
data/
!test/data/
*.conf.rendered
*.pyc
.DS_Store

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_exhook, [
{description, "EMQX Extension for Hook"},
{vsn, "5.0.6"},
{vsn, "5.0.7"},
{modules, []},
{registered, []},
{mod, {emqx_exhook_app, []}},

View File

@ -183,8 +183,10 @@ pre_config_update(_, {enable, Name, Enable}, OldConf) ->
NewConf -> {ok, lists:map(fun maybe_write_certs/1, NewConf)}
end.
post_config_update(_KeyPath, UpdateReq, NewConf, _OldConf, _AppEnvs) ->
{ok, call({update_config, UpdateReq, NewConf})}.
post_config_update(_KeyPath, UpdateReq, NewConf, OldConf, _AppEnvs) ->
Result = call({update_config, UpdateReq, NewConf}),
try_clear_ssl_files(UpdateReq, NewConf, OldConf),
{ok, Result}.
%%=====================================================================
@ -600,3 +602,34 @@ new_ssl_source(Source, undefined) ->
Source;
new_ssl_source(Source, SSL) ->
Source#{<<"ssl">> => SSL}.
try_clear_ssl_files({delete, Name}, _NewConf, OldConfs) ->
OldSSL = find_server_ssl_cfg(Name, OldConfs),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), undefined, OldSSL);
try_clear_ssl_files({Op, Name, _}, NewConfs, OldConfs) when
Op =:= update; Op =:= enable
->
NewSSL = find_server_ssl_cfg(Name, NewConfs),
OldSSL = find_server_ssl_cfg(Name, OldConfs),
emqx_tls_lib:delete_ssl_files(ssl_file_path(Name), NewSSL, OldSSL);
try_clear_ssl_files(_Req, _NewConf, _OldConf) ->
ok.
search_server_cfg(Name, Confs) ->
lists:search(
fun
(#{name := SvrName}) when SvrName =:= Name ->
true;
(_) ->
false
end,
Confs
).
find_server_ssl_cfg(Name, Confs) ->
case search_server_cfg(Name, Confs) of
{value, Value} ->
maps:get(ssl, Value, undefined);
false ->
undefined
end.

View File

@ -0,0 +1,29 @@
-----BEGIN CERTIFICATE-----
MIIE5DCCAswCCQCF3o0gIdaNDjANBgkqhkiG9w0BAQsFADA0MRIwEAYDVQQKDAlF
TVFYIFRlc3QxHjAcBgNVBAMMFUNlcnRpZmljYXRlIEF1dGhvcml0eTAeFw0yMTEy
MzAwODQxMTFaFw00OTA1MTcwODQxMTFaMDQxEjAQBgNVBAoMCUVNUVggVGVzdDEe
MBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEAqmqSrxyH16j63QhqGLT1UO8I+m6BM3HfnJQM8laQdtJ0
WgHqCh0/OphH3S7v4SfF4fNJDEJWMWuuzJzU9cTqHPLzhvo3+ZHcMIENgtY2p2Cf
7AQjEqFViEDyv2ZWNEe76BJeShntdY5NZr4gIPar99YGG/Ln8YekspleV+DU38rE
EX9WzhgBr02NN9z4NzIxeB+jdvPnxcXs3WpUxzfnUjOQf/T1tManvSdRbFmKMbxl
A8NLYK3oAYm8EbljWUINUNN6loqYhbigKv8bvo5S4xvRqmX86XB7sc0SApngtNcg
O0EKn8z/KVPDskE+8lMfGMiU2e2Tzw6Rph57mQPOPtIp5hPiKRik7ST9n0p6piXW
zRLplJEzSjf40I1u+VHmpXlWI/Fs8b1UkDSMiMVJf0LyWb4ziBSZOY2LtZzWHbWj
LbNgxQcwSS29tKgUwfEFmFcm+iOM59cPfkl2IgqVLh5h4zmKJJbfQKSaYb5fcKRf
50b1qsN40VbR3Pk/0lJ0/WqgF6kZCExmT1qzD5HJES/5grjjKA4zIxmHOVU86xOF
ouWvtilVR4PGkzmkFvwK5yRhBUoGH/A9BurhqOc0QCGay1kqHQFA6se4JJS+9KOS
x8Rn1Nm6Pi7sd6Le3cKmHTlyl5a/ofKqTCX2Qh+v/7y62V1V1wnoh3ipRjdPTnMC
AwEAATANBgkqhkiG9w0BAQsFAAOCAgEARCqaocvlMFUQjtFtepO2vyG1krn11xJ0
e7md26i+g8SxCCYqQ9IqGmQBg0Im8fyNDKRN/LZoj5+A4U4XkG1yya91ZIrPpWyF
KUiRAItchNj3g1kHmI2ckl1N//6Kpx3DPaS7qXZaN3LTExf6Ph+StE1FnS0wVF+s
tsNIf6EaQ+ZewW3pjdlLeAws3jvWKUkROc408Ngvx74zbbKo/zAC4tz8oH9ZcpsT
WD8enVVEeUQKI6ItcpZ9HgTI9TFWgfZ1vYwvkoRwNIeabYI62JKmLEo2vGfGwWKr
c+GjnJ/tlVI2DpPljfWOnQ037/7yyJI/zo65+HPRmGRD6MuW/BdPDYOvOZUTcQKh
kANi5THSbJJgZcG3jb1NLebaUQ1H0zgVjn0g3KhUV+NJQYk8RQ7rHtB+MySqTKlM
kRkRjfTfR0Ykxpks7Mjvsb6NcZENf08ZFPd45+e/ptsxpiKu4e4W4bV7NZDvNKf9
0/aD3oGYNMiP7s+KJ1lRSAjnBuG21Yk8FpzG+yr8wvJhV8aFgNQ5wIH86SuUTmN0
5bVzFEIcUejIwvGoQEctNHBlOwHrb7zmB6OwyZeMapdXBQ+9UDhYg8ehDqdDOdfn
wsBcnjD2MwNhlE1hjL+tZWLNwSHiD6xx3LvNoXZu2HK8Cp3SOrkE69cFghYMIZZb
T+fp6tNL6LE=
-----END CERTIFICATE-----

View File

@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIID/jCCAeagAwIBAgIJAKTICmq1Lg6dMA0GCSqGSIb3DQEBCwUAMDQxEjAQBgNV
BAoMCUVNUVggVGVzdDEeMBwGA1UEAwwVQ2VydGlmaWNhdGUgQXV0aG9yaXR5MB4X
DTIxMTIzMDA4NDExMloXDTQ5MDUxNzA4NDExMlowJTESMBAGA1UECgwJRU1RWCBU
ZXN0MQ8wDQYDVQQDDAZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQDzrujfx6XZTH0MWqLO6kNAeHndUZ+OGaURXvxKMPMF5dA40lxNG6cEzzlq
0Rm61adlv8tF4kRJrs6EnRjEVoMImrdh07vGFdOTYqP01LjiBhErAzyRtSn2X8FT
Te8ExoCRs3x61SPebGY2hOvFxuO6YDPVOSDvbbxvRgqIlM1ZXC8dOvPSSGZ+P8hV
56EPayRthfu1FVptnkW9CyZCRI0gg95Hv8RC7bGG+tuWpkN9ZrRvohhgGR1+bDUi
BNBpncEsSh+UgWaj8KRN8D16H6m/Im6ty467j0at49FvPx5nACL48/ghtYvzgKLc
uKHtokKUuuzebDK/hQxN3mUSAJStAgMBAAGjIjAgMAsGA1UdDwQEAwIFoDARBglg
hkgBhvhCAQEEBAMCB4AwDQYJKoZIhvcNAQELBQADggIBAIlVyPhOpkz3MNzQmjX7
xgJ3vGPK5uK11n/wfjRwe2qXwZbrI2sYLVtTpUgvLDuP0gB73Vwfu7xAMdue6TRm
CKr9z0lkQsVBtgoqzZCjd4PYLfHm4EhsOMi98OGKU5uOGD4g3yLwQWXHhbYtiZMO
Jsj0hebYveYJt/BYTd1syGQcIcYCyVExWvSWjidfpAqjT6EF7whdubaFtuF2kaGF
IO9yn9rWtXB5yK99uCguEmKhx3fAQxomzqweTu3WRvy9axsUH3WAUW9a4DIBSz2+
ZSJNheFn5GktgggygJUGYqpSZHooUJW0UBs/8vX6AP+8MtINmqOGZUawmNwLWLOq
wHyVt2YGD5TXjzzsWNSQ4mqXxM6AXniZVZK0yYNjA4ATikX1AtwunyWBR4IjyE/D
FxYPORdZCOtywRFE1R5KLTUq/C8BNGCkYnoO78DJBO+pT0oagkQGQb0CnmC6C1db
4lWzA9K0i4B0PyooZA+gp+5FFgaLuX1DkyeaY1J204QhHR1z/Vcyl5dpqR9hqnYP
t8raLk9ogMDKqKA9iG0wc3CBNckD4sjVWAEeovXhElG55fD21wwhF+AnDCvX8iVK
cBfKV6z6uxfKjGIxc2I643I5DiIn+V3DnPxYyY74Ln1lWFYmt5JREhAxPu42zq74
e6+eIMYFszB+5gKgt6pa6ZNI
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA867o38el2Ux9DFqizupDQHh53VGfjhmlEV78SjDzBeXQONJc
TRunBM85atEZutWnZb/LReJESa7OhJ0YxFaDCJq3YdO7xhXTk2Kj9NS44gYRKwM8
kbUp9l/BU03vBMaAkbN8etUj3mxmNoTrxcbjumAz1Tkg7228b0YKiJTNWVwvHTrz
0khmfj/IVeehD2skbYX7tRVabZ5FvQsmQkSNIIPeR7/EQu2xhvrblqZDfWa0b6IY
YBkdfmw1IgTQaZ3BLEoflIFmo/CkTfA9eh+pvyJurcuOu49GrePRbz8eZwAi+PP4
IbWL84Ci3Lih7aJClLrs3mwyv4UMTd5lEgCUrQIDAQABAoIBAQDwEbBgznrIwn8r
jZt5x/brbAV7Ea/kOcWSgIaCvQifFdJ2OGAwov5/UXwajNgRZe2d4z7qoUhvYuUY
ZwCAZU6ASpRBr2v9cYFYYURvrqZaHmoJew3P6q/lhl6aqFvC06DUagRHqvXEafyk
13zEAvZVpfNKrBaTawPKiDFWb2qDDc9D6hC07EuJ/DNeehiHvzHrSZSDVV5Ut7Bw
YDm33XygheUPAlHfeCnaixzcs3osiVyFEmVjxcIaM0ZS1NgcSaohSpJHMzvEaohX
e+v9vccraSVlw01AlvFwI2vHYUV8jT6HwglTPKKGOCzK/ace3wPdYSU9qLcqfuHn
EFhNc3tNAoGBAPugLMgbReJg2gpbIPUkYyoMMAAU7llFU1WvPWwXzo1a9EBjBACw
WfCZISNtANXR38zIYXzoH547uXi4YPks1Nne3sYuCDpvuX+iz7fIo4zHf1nFmxH7
eE6GtQr2ubmuuipTc28S0wBMGT1/KybH0e2NKL6GaOkNDmAI0IbEMBrvAoGBAPfr
Y1QYLhPhan6m5g/5s+bQpKtHfNH9TNkk13HuYu72zNuY3qL2GC7oSadR8vTbRXZg
KQqfaO0IGRcdkSFTq/AEhSSqr2Ld5nPadMbKvSGrSCc1s8rFH97jRVQY56yhM7ti
IW4+6cE8ylCMbdYB6wuduK/GIgNpqoF4xs1i2XojAoGACacBUMPLEH4Kny8TupOk
wi4pgTdMVVxVcAoC3yyincWJbRbfRm99Y79cCBHcYFdmsGJXawU0gUtlN/5KqgRQ
PfNQtGV7p1I12XGTakdmDrZwai8sXao52TlNpJgGU9siBRGicfZU5cQFi9he/WPY
57XshDJ/v8DidkigRysrdT0CgYEA5iuO22tblC+KvK1dGOXeZWO+DhrfwuGlcFBp
CaimB2/w/8vsn2VVTG9yujo2E6hj1CQw1mDrfG0xRim4LTXOgpbfugwRqvuTUmo2
Ur21XEX2RhjwpEfhcACWxB4fMUG0krrniMA2K6axupi1/KNpQi6bYe3UdFCs8Wld
QSAOAvsCgYBk/X5PmD44DvndE5FShM2w70YOoMr3Cgl5sdwAFUFE9yDuC14UhVxk
oxnYxwtVI9uVVirET+LczP9JEvcvxnN/Xg3tH/qm0WlIxmTxyYrFFIK9j0rqeu9z
blPu56OzNI2VMrR1GbOBLxQINLTIpaacjNJAlr8XOlegdUJsW/Jwqw==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAzLiGiSwpxkENtjrzS7pNLblTnWe4HUUFwYyUX0H+3TnvA86X
EX85yZvFjkzB6lLjUkMY+C6UTVXt+mxeSJbUtSKZhX+2yoF/KYh7SaVjug5FqEqO
LvMpDZQEhUWF2W9DG6eUgOfDoX2milSDIe10yG2WBkryipHAfE7l1t+i6Rh3on+v
561LmrbqyBWR/cLp23RN3sHbkf2pb5/ugtU9twdgJr6Lve73rvSeulewL5BzszKD
BrYqr+PBT5+3ItCc55bTsO7M7CzOIL99BlqdvFH7xT0U1+2BFwLe4/8kwphSqyJE
C5oOiQBFnFVNXmFQSV+k7rPr80i1IO++HeJ6KQIDAQABAoIBAGWgvPjfuaU3qizq
uti/FY07USz0zkuJdkANH6LiSjlchzDmn8wJ0pApCjuIE0PV/g9aS8z4opp5q/gD
UBLM/a8mC/xf2EhTXOMrY7i9p/I3H5FZ4ZehEqIw9sWKK9YzC6dw26HabB2BGOnW
5nozPSQ6cp2RGzJ7BIkxSZwPzPnVTgy3OAuPOiJytvK+hGLhsNaT+Y9bNDvplVT2
ZwYTV8GlHZC+4b2wNROILm0O86v96O+Qd8nn3fXjGHbMsAnONBq10bZS16L4fvkH
5G+W/1PeSXmtZFppdRRDxIW+DWcXK0D48WRliuxcV4eOOxI+a9N2ZJZZiNLQZGwg
w3A8+mECgYEA8HuJFrlRvdoBe2U/EwUtG74dcyy30L4yEBnN5QscXmEEikhaQCfX
Wm6EieMcIB/5I5TQmSw0cmBMeZjSXYoFdoI16/X6yMMuATdxpvhOZGdUGXxhAH+x
xoTUavWZnEqW3fkUU71kT5E2f2i+0zoatFESXHeslJyz85aAYpP92H0CgYEA2e5A
Yozt5eaA1Gyhd8SeptkEU4xPirNUnVQHStpMWUb1kzTNXrPmNWccQ7JpfpG6DcYl
zUF6p6mlzY+zkMiyPQjwEJlhiHM2NlL1QS7td0R8ewgsFoyn8WsBI4RejWrEG9td
EDniuIw+pBFkcWthnTLHwECHdzgquToyTMjrBB0CgYEA28tdGbrZXhcyAZEhHAZA
Gzog+pKlkpEzeonLKIuGKzCrEKRecIK5jrqyQsCjhS0T7ZRnL4g6i0s+umiV5M5w
fcc292pEA1h45L3DD6OlKplSQVTv55/OYS4oY3YEJtf5mfm8vWi9lQeY8sxOlQpn
O+VZTdBHmTC8PGeTAgZXHZUCgYA6Tyv88lYowB7SN2qQgBQu8jvdGtqhcs/99GCr
H3N0I69LPsKAR0QeH8OJPXBKhDUywESXAaEOwS5yrLNP1tMRz5Vj65YUCzeDG3kx
gpvY4IMp7ArX0bSRvJ6mYSFnVxy3k174G3TVCfksrtagHioVBGQ7xUg5ltafjrms
n8l55QKBgQDVzU8tQvBVqY8/1lnw11Vj4fkE/drZHJ5UkdC1eenOfSWhlSLfUJ8j
ds7vEWpRPPoVuPZYeR1y78cyxKe1GBx6Wa2lF5c7xjmiu0xbRnrxYeLolce9/ntp
asClqpnHT8/VJYTD7Kqj0fouTTZf0zkig/y+2XERppd8k+pSKjUCPQ==
-----END RSA PRIVATE KEY-----

View File

@ -348,6 +348,55 @@ t_stop_timeout(_) ->
snabbkaffe:stop(),
meck:unload(emqx_exhook_demo_svr).
t_ssl_clear(_) ->
SvrName = <<"ssl_test">>,
SSLConf = #{
<<"cacertfile">> => cert_file("cafile"),
<<"certfile">> => cert_file("certfile"),
<<"enable">> => true,
<<"keyfile">> => cert_file("keyfile"),
<<"verify">> => <<"verify_peer">>
},
AddConf = #{
<<"auto_reconnect">> => <<"60s">>,
<<"enable">> => false,
<<"failed_action">> => <<"deny">>,
<<"name">> => <<"ssl_test">>,
<<"pool_size">> => 16,
<<"request_timeout">> => <<"5s">>,
<<"ssl">> => SSLConf,
<<"url">> => <<"http://127.0.0.1:9000">>
},
emqx_exhook_mgr:update_config([exhook, servers], {add, AddConf}),
ListResult1 = list_pem_dir(SvrName),
?assertMatch({ok, [_, _, _]}, ListResult1),
{ok, ResultList1} = ListResult1,
UpdateConf = AddConf#{<<"ssl">> => SSLConf#{<<"keyfile">> => cert_file("keyfile2")}},
emqx_exhook_mgr:update_config([exhook, servers], {update, SvrName, UpdateConf}),
ListResult2 = list_pem_dir(SvrName),
?assertMatch({ok, [_, _, _]}, ListResult2),
{ok, ResultList2} = ListResult2,
FindKeyFile = fun(List) ->
case lists:search(fun(E) -> lists:prefix("key", E) end, List) of
{value, Value} ->
Value;
_ ->
?assert(false, "Can't find keyfile")
end
end,
?assertNotEqual(FindKeyFile(ResultList1), FindKeyFile(ResultList2)),
emqx_exhook_mgr:update_config([exhook, servers], {delete, SvrName}),
?assertMatch({error, not_dir}, list_pem_dir(SvrName)),
ok.
%%--------------------------------------------------------------------
%% Cases Helpers
%%--------------------------------------------------------------------
@ -414,3 +463,20 @@ loaded_exhook_hookpoints() ->
is_exhook_callback(Cb) ->
Action = element(2, Cb),
emqx_exhook_handler == element(1, Action).
list_pem_dir(Name) ->
Dir = filename:join([emqx:mutable_certs_dir(), "exhook", Name]),
case filelib:is_dir(Dir) of
true ->
file:list_dir(Dir);
_ ->
{error, not_dir}
end.
data_file(Name) ->
Dir = code:lib_dir(emqx_exhook, test),
{ok, Bin} = file:read_file(filename:join([Dir, "data", Name])),
Bin.
cert_file(Name) ->
data_file(filename:join(["certs", Name])).

View File

@ -16,7 +16,7 @@
-module(emqx_gateway_impl).
-include("include/emqx_gateway.hrl").
-include("emqx_gateway.hrl").
-type state() :: map().
-type reason() :: any().

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.8"},
{vsn, "0.1.9"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, emqx, emqx_authn]},

View File

@ -82,10 +82,10 @@ code_change(_OldVsn, State, _Extra) ->
%%--------------------------------------------------------------------
ensure_timer(State) ->
case application:get_env(emqx_machine, global_gc_interval) of
undefined ->
case application:get_env(emqx_machine, global_gc_interval, disabled) of
disabled ->
State;
{ok, Interval} ->
Interval when is_integer(Interval) ->
TRef = emqx_misc:start_timer(Interval, run),
State#{timer := TRef}
end.

View File

@ -62,7 +62,11 @@ schema("/subscriptions") ->
tags => [<<"Subscriptions">>],
parameters => parameters(),
responses => #{
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{})
200 => hoconsc:mk(hoconsc:array(hoconsc:ref(?MODULE, subscription)), #{}),
400 => emqx_dashboard_swagger:error_codes(
['INVALID_PARAMETER'], <<"Invalid parameter">>
),
500 => emqx_dashboard_swagger:error_codes(['NODE_DOWN'], <<"Bad RPC">>)
}
}
}.
@ -179,10 +183,8 @@ format(WhichNode, {{_Subscriber, Topic}, Options}) ->
maps:with([qos, nl, rap, rh], Options)
).
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
filename:join([Group, Topic]);
get_topic(Topic, #{share := Group}) ->
filename:join([<<"$share">>, Group, Topic]);
emqx_topic:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) ->
Topic.

View File

@ -72,7 +72,7 @@ schema("/trace") ->
description => "List all trace",
tags => ?TAGS,
responses => #{
200 => hoconsc:ref(trace)
200 => hoconsc:array(hoconsc:ref(trace))
}
},
post => #{
@ -140,7 +140,8 @@ schema("/trace/:name/download") ->
'application/octet-stream' =>
#{schema => #{type => "string", format => "binary"}}
}
}
},
404 => emqx_dashboard_swagger:error_codes(['NOT_FOUND'], <<"Trace Name Not Found">>)
}
}
};
@ -172,9 +173,12 @@ schema("/trace/:name/log") ->
responses => #{
200 =>
[
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})}
| fields(bytes) ++ fields(position)
]
{items, hoconsc:mk(binary(), #{example => "TEXT-LOG-ITEMS"})},
{meta, fields(bytes) ++ fields(position)}
],
400 => emqx_dashboard_swagger:error_codes(
['READ_FILE_ERROR', 'RPC_ERROR', 'NODE_ERROR'], <<"Trace Log Failed">>
)
}
}
}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_modules, [
{description, "EMQX Modules"},
{vsn, "5.0.7"},
{vsn, "5.0.8"},
{modules, []},
{applications, [kernel, stdlib, emqx]},
{mod, {emqx_modules_app, []}},

View File

@ -36,6 +36,7 @@
]).
-define(BAD_REQUEST, 'BAD_REQUEST').
-define(NOT_FOUND, 'NOT_FOUND').
api_spec() ->
emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
@ -63,7 +64,8 @@ schema("/telemetry/status") ->
'requestBody' => status_schema(?DESC(update_telemetry_status_api)),
responses =>
#{
200 => status_schema(?DESC(update_telemetry_status_api))
200 => status_schema(?DESC(update_telemetry_status_api)),
400 => emqx_dashboard_swagger:error_codes([?BAD_REQUEST], <<"Bad Request">>)
}
}
};
@ -75,7 +77,12 @@ schema("/telemetry/data") ->
description => ?DESC(get_telemetry_data_api),
tags => [<<"Telemetry">>],
responses =>
#{200 => mk(ref(?MODULE, telemetry), #{desc => ?DESC(get_telemetry_data_api)})}
#{
200 => mk(ref(?MODULE, telemetry), #{desc => ?DESC(get_telemetry_data_api)}),
404 => emqx_dashboard_swagger:error_codes(
[?NOT_FOUND], <<"Telemetry is not enabled">>
)
}
}
}.
@ -220,21 +227,29 @@ status(put, #{body := Body}) ->
true -> <<"Telemetry status is already enabled">>;
false -> <<"Telemetry status is already disabled">>
end,
{400, #{code => 'BAD_REQUEST', message => Reason}};
{400, #{code => ?BAD_REQUEST, message => Reason}};
false ->
case enable_telemetry(Enable) of
ok ->
{200, get_telemetry_status()};
{error, Reason} ->
{400, #{
code => 'BAD_REQUEST',
code => ?BAD_REQUEST,
message => Reason
}}
end
end.
data(get, _Request) ->
{200, emqx_json:encode(get_telemetry_data())}.
case emqx_modules_conf:is_telemetry_enabled() of
true ->
{200, emqx_json:encode(get_telemetry_data())};
false ->
{404, #{
code => ?NOT_FOUND,
message => <<"Telemetry is not enabled">>
}}
end.
%%--------------------------------------------------------------------
%% Internal functions

View File

@ -61,8 +61,20 @@ init_per_testcase(t_status_non_official, Config) ->
init_per_testcase(t_status, Config) ->
meck:new(emqx_telemetry, [non_strict, passthrough]),
meck:expect(emqx_telemetry, enable, fun() -> ok end),
{ok, _, _} =
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
),
Config;
init_per_testcase(_TestCase, Config) ->
{ok, _, _} =
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
),
Config.
end_per_testcase(t_status_non_official, _Config) ->
@ -169,4 +181,16 @@ t_data(_) ->
<<"vm_specs">> := _
},
jsx:decode(Result)
).
),
{ok, 200, _} =
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
),
{ok, 404, _} =
request(get, uri(["telemetry", "data"])),
ok.

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -65,10 +65,7 @@ fields("rule_info") ->
] ++ fields("rule_creation");
%% TODO: we can delete this API if the Dashboard not depends on it
fields("rule_events") ->
ETopics = [
binary_to_atom(emqx_rule_events:event_topic(E))
|| E <- emqx_rule_events:event_names()
],
ETopics = emqx_rule_events:event_topics_enum(),
[
{"event", sc(hoconsc:enum(ETopics), #{desc => ?DESC("rs_event"), required => true})},
{"title", sc(binary(), #{desc => ?DESC("rs_title"), example => "some title"})},
@ -150,77 +147,43 @@ fields("node_metrics") ->
fields("metrics");
fields("ctx_pub") ->
[
{"event_type", sc(message_publish, #{desc => ?DESC("event_event_type"), required => true})},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"event_type", event_type_sc(message_publish)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})}
| msg_event_common_fields()
];
fields("ctx_sub") ->
[
{"event_type",
sc(session_subscribed, #{desc => ?DESC("event_event_type"), required => true})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"event_type", event_type_sc(session_subscribed)}
| msg_event_common_fields()
];
fields("ctx_unsub") ->
[
{"event_type",
sc(session_unsubscribed, #{desc => ?DESC("event_event_type"), required => true})}
] ++
proplists:delete("event_type", fields("ctx_sub"));
{"event_type", event_type_sc(session_unsubscribed)}
| proplists:delete("event_type", fields("ctx_sub"))
];
fields("ctx_delivered") ->
[
{"event_type",
sc(message_delivered, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(message_delivered)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"from_clientid", sc(binary(), #{desc => ?DESC("event_from_clientid")})},
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"from_username", sc(binary(), #{desc => ?DESC("event_from_username")})}
| msg_event_common_fields()
];
fields("ctx_acked") ->
[{"event_type", sc(message_acked, #{desc => ?DESC("event_event_type"), required => true})}] ++
proplists:delete("event_type", fields("ctx_delivered"));
[
{"event_type", event_type_sc(message_acked)}
| proplists:delete("event_type", fields("ctx_delivered"))
];
fields("ctx_dropped") ->
[
{"event_type", sc(message_dropped, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(message_dropped)},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()];
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_dropped")})}
| msg_event_common_fields()
];
fields("ctx_connected") ->
[
{"event_type",
sc(client_connected, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_connected)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"mountpoint", sc(binary(), #{desc => ?DESC("event_mountpoint")})},
@ -239,8 +202,7 @@ fields("ctx_connected") ->
];
fields("ctx_disconnected") ->
[
{"event_type",
sc(client_disconnected, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_disconnected)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"reason", sc(binary(), #{desc => ?DESC("event_ctx_disconnected_reason")})},
@ -253,7 +215,7 @@ fields("ctx_disconnected") ->
];
fields("ctx_connack") ->
[
{"event_type", sc(client_connack, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_connack)},
{"reason_code", sc(binary(), #{desc => ?DESC("event_ctx_connack_reason_code")})},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"clean_start", sc(boolean(), #{desc => ?DESC("event_clean_start"), default => true})},
@ -271,8 +233,7 @@ fields("ctx_connack") ->
];
fields("ctx_check_authz_complete") ->
[
{"event_type",
sc(client_check_authz_complete, #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc(client_check_authz_complete)},
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
@ -283,19 +244,16 @@ fields("ctx_check_authz_complete") ->
];
fields("ctx_bridge_mqtt") ->
[
{"event_type",
sc('$bridges/mqtt:*', #{desc => ?DESC("event_event_type"), required => true})},
{"event_type", event_type_sc('$bridges/mqtt:*')},
{"id", sc(binary(), #{desc => ?DESC("event_id")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"server", sc(binary(), #{desc => ?DESC("event_server")})},
{"dup", sc(binary(), #{desc => ?DESC("event_dup")})},
{"retain", sc(binary(), #{desc => ?DESC("event_retain")})},
{"message_received_at",
sc(integer(), #{
desc => ?DESC("event_publish_received_at")
})}
] ++ [qos()].
{"message_received_at", publish_received_at_sc()},
qos()
].
qos() ->
{"qos", sc(emqx_schema:qos(), #{desc => ?DESC("event_qos")})}.
@ -312,4 +270,22 @@ rule_id() ->
)}.
sc(Type, Meta) -> hoconsc:mk(Type, Meta).
ref(Field) -> hoconsc:ref(?MODULE, Field).
event_type_sc(Event) ->
sc(Event, #{desc => ?DESC("event_event_type"), required => true}).
publish_received_at_sc() ->
sc(integer(), #{desc => ?DESC("event_publish_received_at")}).
msg_event_common_fields() ->
[
{"clientid", sc(binary(), #{desc => ?DESC("event_clientid")})},
{"username", sc(binary(), #{desc => ?DESC("event_username")})},
{"payload", sc(binary(), #{desc => ?DESC("event_payload")})},
{"peerhost", sc(binary(), #{desc => ?DESC("event_peerhost")})},
{"topic", sc(binary(), #{desc => ?DESC("event_topic")})},
{"publish_received_at", publish_received_at_sc()},
qos()
].

View File

@ -28,6 +28,7 @@
unload/1,
event_names/0,
event_name/1,
event_topics_enum/0,
event_topic/1,
eventmsg_publish/1
]).
@ -78,6 +79,22 @@ event_names() ->
'delivery.dropped'
].
%% for documentation purposes
event_topics_enum() ->
[
'$events/client_connected',
'$events/client_disconnected',
'$events/client_connack',
'$events/client_check_authz_complete',
'$events/session_subscribed',
'$events/session_unsubscribed',
'$events/message_delivered',
'$events/message_acked',
'$events/message_dropped',
'$events/delivery_dropped'
% '$events/message_publish' % not possible to use in SELECT FROM
].
reload() ->
lists:foreach(
fun(Rule) ->
@ -88,21 +105,22 @@ reload() ->
load(Topic) ->
HookPoint = event_name(Topic),
HookFun = hook_fun_name(HookPoint),
emqx_hooks:put(
HookPoint, {?MODULE, hook_fun(HookPoint), [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
HookPoint, {?MODULE, HookFun, [#{event_topic => Topic}]}, ?HP_RULE_ENGINE
).
unload() ->
lists:foreach(
fun(HookPoint) ->
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)})
emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)})
end,
event_names()
).
unload(Topic) ->
HookPoint = event_name(Topic),
emqx_hooks:del(HookPoint, {?MODULE, hook_fun(HookPoint)}).
emqx_hooks:del(HookPoint, {?MODULE, hook_fun_name(HookPoint)}).
%%--------------------------------------------------------------------
%% Callbacks
@ -987,15 +1005,25 @@ columns_example_props_specific(unsub_props) ->
%% Helper functions
%%--------------------------------------------------------------------
hook_fun(<<"$bridges/", _/binary>>) ->
on_bridge_message_received;
hook_fun(Event) ->
case string:split(atom_to_list(Event), ".") of
[Prefix, Name] ->
list_to_atom(lists:append(["on_", Prefix, "_", Name]));
[_] ->
error(invalid_event, Event)
end.
hook_fun_name(HookPoint) ->
HookFun = hook_fun(HookPoint),
{name, HookFunName} = erlang:fun_info(HookFun, name),
HookFunName.
%% return static function references to help static code checks
hook_fun(<<"$bridges/", _/binary>>) -> fun ?MODULE:on_bridge_message_received/2;
hook_fun('client.connected') -> fun ?MODULE:on_client_connected/3;
hook_fun('client.disconnected') -> fun ?MODULE:on_client_disconnected/4;
hook_fun('client.connack') -> fun ?MODULE:on_client_connack/4;
hook_fun('client.check_authz_complete') -> fun ?MODULE:on_client_check_authz_complete/6;
hook_fun('session.subscribed') -> fun ?MODULE:on_session_subscribed/4;
hook_fun('session.unsubscribed') -> fun ?MODULE:on_session_unsubscribed/4;
hook_fun('message.delivered') -> fun ?MODULE:on_message_delivered/3;
hook_fun('message.acked') -> fun ?MODULE:on_message_acked/3;
hook_fun('message.dropped') -> fun ?MODULE:on_message_dropped/4;
hook_fun('delivery.dropped') -> fun ?MODULE:on_delivery_dropped/4;
hook_fun('message.publish') -> fun ?MODULE:on_message_publish/2;
hook_fun(Event) -> error({invalid_event, Event}).
reason(Reason) when is_atom(Reason) -> Reason;
reason({shutdown, Reason}) when is_atom(Reason) -> Reason;
@ -1006,19 +1034,20 @@ ntoa(undefined) -> undefined;
ntoa({IpAddr, Port}) -> iolist_to_binary([inet:ntoa(IpAddr), ":", integer_to_list(Port)]);
ntoa(IpAddr) -> iolist_to_binary(inet:ntoa(IpAddr)).
event_name(<<"$events/client_connected", _/binary>>) -> 'client.connected';
event_name(<<"$events/client_disconnected", _/binary>>) -> 'client.disconnected';
event_name(<<"$events/client_connack", _/binary>>) -> 'client.connack';
event_name(<<"$events/client_check_authz_complete", _/binary>>) -> 'client.check_authz_complete';
event_name(<<"$events/session_subscribed", _/binary>>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed", _/binary>>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered", _/binary>>) -> 'message.delivered';
event_name(<<"$events/message_acked", _/binary>>) -> 'message.acked';
event_name(<<"$events/message_dropped", _/binary>>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped", _/binary>>) -> 'delivery.dropped';
event_name(<<"$bridges/", _/binary>> = Topic) -> Topic;
event_name(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_name(<<"$events/client_connected">>) -> 'client.connected';
event_name(<<"$events/client_disconnected">>) -> 'client.disconnected';
event_name(<<"$events/client_connack">>) -> 'client.connack';
event_name(<<"$events/client_check_authz_complete">>) -> 'client.check_authz_complete';
event_name(<<"$events/session_subscribed">>) -> 'session.subscribed';
event_name(<<"$events/session_unsubscribed">>) -> 'session.unsubscribed';
event_name(<<"$events/message_delivered">>) -> 'message.delivered';
event_name(<<"$events/message_acked">>) -> 'message.acked';
event_name(<<"$events/message_dropped">>) -> 'message.dropped';
event_name(<<"$events/delivery_dropped">>) -> 'delivery.dropped';
event_name(_) -> 'message.publish'.
event_topic(<<"$bridges/", _/binary>> = Bridge) -> Bridge;
event_topic('client.connected') -> <<"$events/client_connected">>;
event_topic('client.disconnected') -> <<"$events/client_disconnected">>;
event_topic('client.connack') -> <<"$events/client_connack">>;
@ -1029,8 +1058,7 @@ event_topic('message.delivered') -> <<"$events/message_delivered">>;
event_topic('message.acked') -> <<"$events/message_acked">>;
event_topic('message.dropped') -> <<"$events/message_dropped">>;
event_topic('delivery.dropped') -> <<"$events/delivery_dropped">>;
event_topic('message.publish') -> <<"$events/message_publish">>;
event_topic(<<"$bridges/", _/binary>> = Topic) -> Topic.
event_topic('message.publish') -> <<"$events/message_publish">>.
printable_maps(undefined) ->
#{};

View File

@ -8,19 +8,18 @@
all() -> emqx_common_test_helpers:all(?MODULE).
t_mod_hook_fun(_) ->
Funcs = emqx_rule_events:module_info(exports),
[
?assert(lists:keymember(emqx_rule_events:hook_fun(Event), 1, Funcs))
|| Event <- [
'client.connected',
'client.disconnected',
'session.subscribed',
'session.unsubscribed',
'message.acked',
'message.dropped',
'message.delivered'
]
].
Events = emqx_rule_events:event_names(),
lists:foreach(
fun(E) ->
?assert(is_function(emqx_rule_events:hook_fun(E)))
end,
Events
),
?assertEqual(
fun emqx_rule_events:on_bridge_message_received/2,
emqx_rule_events:hook_fun(<<"$bridges/foo">>)
),
?assertError({invalid_event, foo}, emqx_rule_events:hook_fun(foo)).
t_printable_maps(_) ->
Headers = #{
@ -42,3 +41,24 @@ t_printable_maps(_) ->
?assertNot(maps:is_key(redispatch_to, Converted)),
?assertNot(maps:is_key(shared_dispatch_ack, Converted)),
ok.
t_event_name_topic_conversion(_) ->
Events = emqx_rule_events:event_names() -- ['message.publish'],
Topics = [atom_to_binary(A) || A <- emqx_rule_events:event_topics_enum()],
Zip = lists:zip(Events, Topics),
lists:foreach(
fun({Event, Topic}) ->
?assertEqual(Event, emqx_rule_events:event_name(Topic)),
?assertEqual(Topic, emqx_rule_events:event_topic(Event))
end,
Zip
).
t_special_events_name_topic_conversion(_) ->
Bridge = <<"$bridges/foo:bar">>,
AdHoc = <<"foo/bar">>,
NonExisting = <<"$events/message_publish">>,
?assertEqual(Bridge, emqx_rule_events:event_name(Bridge)),
?assertEqual('message.publish', emqx_rule_events:event_name(AdHoc)),
?assertEqual('message.publish', emqx_rule_events:event_name(NonExisting)),
?assertEqual(NonExisting, emqx_rule_events:event_topic('message.publish')).

25
changes/v5.0.12-en.md Normal file
View File

@ -0,0 +1,25 @@
# v5.0.12
## Enhancements
- Disable global garbage collection by `node.global_gc_interval = disabled` [#9418](https://github.com/emqx/emqx/pull/9418)。
- Improve the CLI to avoid waste atom table when typing erros [#9416](https://github.com/emqx/emqx/pull/9416).
- Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423).
- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412).
Shared subscriptions are now part of the MQTT spec. Use `$share` instead.
- Refactor authn API by replacing `POST /authentication/{id}/move` with `PUT /authentication/{id}/position/{position}`. [#9419](https://github.com/emqx/emqx/pull/9419).
Same is done for `/listeners/{listener_id}/authentication/id/...`.
## Bug fixes
- Fix that the obsolete SSL files aren't deleted after the ExHook config update [#9432](https://github.com/emqx/emqx/pull/9432).
- Fix doc and schema for `/trace` API [#9468](https://github.com/emqx/emqx/pull/9468).
- Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464).
- Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477).

View File

@ -0,0 +1,29 @@
# v5.0.12
## Enhancements
- Disable global garbage collection by `node.global_gc_interval = disabled` [#9418](https://github.com/emqx/emqx/pull/9418)。
- Improve the CLI to avoid waste atom table when typing erros [#9416](https://github.com/emqx/emqx/pull/9416).
- Start building MacOS packages for Apple Silicon hadrdware [#9423](https://github.com/emqx/emqx/pull/9423).
- Remove support for setting shared subscriptions using the non-standard `$queue` feature [#9412](https://github.com/emqx/emqx/pull/9412).
Shared subscriptions are now part of the MQTT spec. Use `$share` instead.
- Refactor authn API by replacing `POST /authentication/{id}/move` with `PUT /authentication/{id}/position/{position}`. [#9419](https://github.com/emqx/emqx/pull/9419).
Same is done for `/listeners/{listener_id}/authentication/id/...`.
- Return `204` instead of `200` for `PUT /authenticator/:id` [#9434](https://github.com/emqx/emqx/pull/9434/).
## Bug fixes
- Fix that the obsolete SSL files aren't deleted after the ExHook config update [#9432](https://github.com/emqx/emqx/pull/9432).
- Fix doc and schema for `/trace` API [#9468](https://github.com/emqx/emqx/pull/9468).
<<<<<<< HEAD
- Return `404` for `/telemetry/data` in case it's disabled [#9464](https://github.com/emqx/emqx/pull/9464).
=======
- Fix some potential MQTT packet parse errors [#9477](https://github.com/emqx/emqx/pull/9477).
>>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test)

24
changes/v5.0.12-zh.md Normal file
View File

@ -0,0 +1,24 @@
# v5.0.12
## 增强
- 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。
- 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。
- 支持在 Apple Silicon 架构下编译苹果系统的发行版本 [#9423](https://github.com/emqx/emqx/pull/9423)。
- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。
共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`
- 重构认证 API使用 `PUT /authentication/{id}/position/{position}` 代替了 `POST /authentication/{id}/move` [#9419](https://github.com/emqx/emqx/pull/9419)。
## 修复
- 修复 ExHook 更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9432](https://github.com/emqx/emqx/pull/9432)。
- 修复 /trace API 的返回值格式和相关文档 [#9468](https://github.com/emqx/emqx/pull/9468)。
- 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。
- 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。

View File

@ -0,0 +1,28 @@
# v5.0.12
## 增强
- 通过 `node.global_gc_interval = disabled` 来禁用全局垃圾回收 [#9418](https://github.com/emqx/emqx/pull/9418)。
- 现在,`PUT /authenticator/:id` 将会返回 204 而不再是 200 [#9434](https://github.com/emqx/emqx/pull/9434/)。
- 优化命令行实现, 避免输入错误指令时, 产生不必要的原子表消耗 [#9416](https://github.com/emqx/emqx/pull/9416)。
- 支持在 Apple Silicon 架构下编译苹果系统的发行版本 [#9423](https://github.com/emqx/emqx/pull/9423)。
- 删除了老的共享订阅支持方式, 不再使用 `$queue` 前缀 [#9412](https://github.com/emqx/emqx/pull/9412)。
共享订阅自 MQTT v5.0 开始已成为协议标准,可以使用 `$share` 前缀代替 `$queue`
- 重构认证 API使用 `PUT /authentication/{id}/position/{position}` 代替了 `POST /authentication/{id}/move` [#9419](https://github.com/emqx/emqx/pull/9419)。
## 修复
- 修复 ExHook 更新 SSL 相关配置后,过时的 SSL 文件没有被删除的问题 [#9432](https://github.com/emqx/emqx/pull/9432)。
- 修复 /trace API 的返回值格式和相关文档 [#9468](https://github.com/emqx/emqx/pull/9468)。
<<<<<<< HEAD
- 在遥测功能未开启时,通过 /telemetry/data 请求其数据,将会返回 404 [#9464](https://github.com/emqx/emqx/pull/9464)。
=======
- 修复了一些 MQTT 协议包的潜在解析错误 [#9477](https://github.com/emqx/emqx/pull/9477)。
>>>>>>> 030a07d8e (fix(frame): fix potential parse errors found by fuzzing test)

View File

@ -30,11 +30,15 @@ Starting from 3.0 release, *EMQX* broker fully supports MQTT V5.0 protocol speci
Execute some command under this docker image
``docker run -d --name emqx emqx/emqx:$(tag)``
```console
$ docker run -d --name emqx emqx/emqx:${tag}
```
For example
``docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx/emqx:latest``
```console
$ docker run -d --name emqx -p 18083:18083 -p 1883:1883 emqx/emqx:latest
```
The EMQX broker runs as Linux user `emqx` in the docker container.
@ -42,7 +46,7 @@ The EMQX broker runs as Linux user `emqx` in the docker container.
Use the environment variable to configure the EMQX docker container.
By default, the environment variables with ``EMQX_`` prefix are mapped to key-value pairs in configuration files.
By default, the environment variables with `EMQX_` prefix are mapped to key-value pairs in configuration files.
You can change the prefix by overriding `HOCON_ENV_OVERRIDE_PREFIX`.
@ -53,9 +57,9 @@ EMQX_LISTENERS__SSL__DEFAULT__ACCEPTORS <--> listeners.ssl.default.acceptors
EMQX_ZONES__DEFAULT__MQTT__MAX_PACKET_SIZE <--> zones.default.mqtt.max_packet_size
```
+ Prefix ``EMQX_`` is removed
+ Prefix `EMQX_` is removed
+ All upper case letters is replaced with lower case letters
+ ``__`` is replaced with ``.``
+ `__` is replaced with `.`
If `HOCON_ENV_OVERRIDE_PREFIX=DEV_` is set:
@ -75,41 +79,43 @@ These environment variables will ignore for configuration file.
#### EMQX Configuration
> NOTE: All EMQX Configuration in [`etc/emqx.conf`](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) can be configured via environment variables. The following list is just an example, not a complete configuration.
> NOTE: All EMQX Configuration in [`etc/emqx.conf`](https://github.com/emqx/emqx/blob/master/apps/emqx/etc/emqx.conf) can be configured via environment variables. The following list is just an example, not a complete configuration.
| Options | Default | Mapped | Description |
| ---------------------------| ------------------ | ------------------------- | ------------------------------------- |
| `EMQX_NAME` | container name | none | EMQX node short name |
| `EMQX_HOST` | container IP | none | EMQX node host, IP or FQDN |
The list is incomplete and may changed with [`etc/emqx.conf`](https://github.com/emqx/emqx/blob/master/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
The list is incomplete and may be changed with [`etc/emqx.conf`](https://github.com/emqx/emqx/blob/master/apps/emqx/etc/emqx.conf) and plugin configuration files. But the mapping rule is similar.
If set ``EMQX_NAME`` and ``EMQX_HOST``, and unset ``EMQX_NODE_NAME``, ``EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST``.
If set `EMQX_NAME` and `EMQX_HOST`, and unset `EMQX_NODE_NAME`, `EMQX_NODE_NAME=$EMQX_NAME@$EMQX_HOST`.
For example, set MQTT TCP port to 1883
``docker run -d --name emqx -e EMQX__LISTENERS__TCP__DEFAULT__BIND=1883 -p 18083:18083 -p 1883:1883 emqx/emqx:latest``
```console
$ docker run -d --name emqx -e EMQX__LISTENERS__TCP__DEFAULT__BIND=1883 -p 18083:18083 -p 1883:1883 emqx/emqx:latest
```
#### EMQX Loaded Modules Configuration
| Oprtions | Default | Description |
| ------------------------ | ------------------ | ------------------------------------- |
| `EMQX_LOADED_MODULES` | see content below | default EMQX loaded modules |
| Options | Default | Description |
| ----------------------- | ------------------ | ------------------------------------- |
| `EMQX_LOADED_MODULES` | see content below | default EMQX loaded modules |
Default environment variable ``EMQX_LOADED_MODULES``, including
Default environment variable `EMQX_LOADED_MODULES`, including
+ ``emqx_mod_presence``
+ `emqx_mod_presence`
```bash
# The default EMQX_LOADED_MODULES env
EMQX_LOADED_MODULES="emqx_mod_presence"
```
For example, set ``EMQX_LOADED_MODULES=emqx_mod_delayed,emqx_mod_rewrite`` to load these two modules.
For example, set `EMQX_LOADED_MODULES=emqx_mod_delayed,emqx_mod_rewrite` to load these two modules.
You can use comma, space or other separator that you want.
All the modules defined in env ``EMQX_LOADED_MODULES`` will be loaded.
All the modules defined in env `EMQX_LOADED_MODULES` will be loaded.
```bash
EMQX_LOADED_MODULES="emqx_mod_delayed,emqx_mod_rewrite"
@ -119,28 +125,28 @@ EMQX_LOADED_MODULES="emqx_mod_delayed | emqx_mod_rewrite"
#### EMQX Loaded Plugins Configuration
| Oprtions | Default | Description |
| ------------------------ | ------------------ | ------------------------------------- |
| `EMQX_LOADED_PLUGINS` | see content below | default EMQX loaded plugins |
| Options | Default | Description |
| ----------------------- | ------------------ | ------------------------------------- |
| `EMQX_LOADED_PLUGINS` | see content below | default EMQX loaded plugins |
Default environment variable ``EMQX_LOADED_PLUGINS``, including
Default environment variable `EMQX_LOADED_PLUGINS`, including
+ ``emqx_recon``
+ ``emqx_retainer``
+ ``emqx_rule_engine``
+ ``emqx_management``
+ ``emqx_dashboard``
+ `emqx_recon`
+ `emqx_retainer`
+ `emqx_rule_engine`
+ `emqx_management`
+ `emqx_dashboard`
```bash
# The default EMQX_LOADED_PLUGINS env
EMQX_LOADED_PLUGINS="emqx_recon,emqx_retainer,emqx_management,emqx_dashboard"
```
For example, set ``EMQX_LOADED_PLUGINS= emqx_retainer,emqx_rule_engine`` to load these two plugins.
For example, set `EMQX_LOADED_PLUGINS= emqx_retainer,emqx_rule_engine` to load these two plugins.
You can use comma, space or other separator that you want.
All the plugins defined in ``EMQX_LOADED_PLUGINS`` will be loaded.
All the plugins defined in `EMQX_LOADED_PLUGINS` will be loaded.
```bash
EMQX_LOADED_PLUGINS="emqx_retainer,emqx_rule_engine"
@ -150,7 +156,7 @@ EMQX_LOADED_PLUGINS="emqx_retainer | emqx_rule_engine"
#### EMQX Plugins Configuration
The environment variables which with ``EMQX_`` prefix are mapped to all EMQX plugins' configuration file, ``.`` get replaced by ``__``.
The environment variables which with `EMQX_` prefix are mapped to all EMQX plugins' configuration file, `.` get replaced by `__`.
Example:
@ -177,7 +183,7 @@ docker run -d --name emqx -p 18083:18083 -p 1883:1883 \
emqx/emqx:latest
```
For numbered configuration options where the number is next to a ``.`` such as:
For numbered configuration options where the number is next to a `.` such as:
+ backend.redis.pool1.server
+ backend.redis.hook.message.publish.1
@ -186,20 +192,20 @@ You can configure an arbitrary number of them as long as each has a unique numbe
```bash
docker run -d --name emqx -p 18083:18083 -p 1883:1883 \
-e EMQX_BACKEND_REDIS_POOL1__SERVER=127.0.0.1:6379
-e EMQX_BACKEND_REDIS_POOL1__SERVER=127.0.0.1:6379 \
[...]
-e EMQX_BACKEND__REDIS__POOL5__SERVER=127.0.0.5:6379
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__1='{"topic": "persistent/topic1", "action": {"function": "on_message_publish"}, "pool": "pool1"}'
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__2='{"topic": "persistent/topic2", "action": {"function": "on_message_publish"}, "pool": "pool1"}'
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__3='{"topic": "persistent/topic3", "action": {"function": "on_message_publish"}, "pool": "pool1"}'
-e EMQX_BACKEND__REDIS__POOL5__SERVER=127.0.0.5:6379 \
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__1='{"topic": "persistent/topic1", "action": {"function": "on_message_publish"}, "pool": "pool1"}' \
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__2='{"topic": "persistent/topic2", "action": {"function": "on_message_publish"}, "pool": "pool1"}' \
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__3='{"topic": "persistent/topic3", "action": {"function": "on_message_publish"}, "pool": "pool1"}' \
[...]
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__13='{"topic": "persistent/topic13", "action": {"function": "on_message_publish"}, "pool": "pool1"}'
-e EMQX_BACKEND__REDIS__HOOK_MESSAGE__PUBLISH__13='{"topic": "persistent/topic13", "action": {"function": "on_message_publish"}, "pool": "pool1"}' \
emqx/emqx:latest
```
### Cluster
EMQX supports a variety of clustering methods, see our [documentation](https://www.emqx.io/docs/en/latest/advanced/cluster.html) for details.
EMQX supports a variety of clustering methods, see our [documentation](https://www.emqx.io/docs/en/latest/deploy/cluster/intro.html) for details.
Let's create a static node list cluster from docker-compose.
@ -236,7 +242,6 @@ Let's create a static node list cluster from docker-compose.
networks:
emqx-bridge:
driver: bridge
```
+ Start the docker-compose cluster
@ -276,7 +281,7 @@ volumes:
services:
emqx:
image: emqx/emqx:v4.0.0
image: emqx/emqx:latest
restart: always
environment:
EMQX_NAME: foo_emqx
@ -289,12 +294,11 @@ services:
### Kernel Tuning
Under Linux host machine, the easiest way is [Tuning guide](https://www.emqx.io/docs/en/latest/tutorial/tune.html).
Under Linux host machine, the easiest way is [Tuning guide](https://www.emqx.io/docs/en/latest/deploy/tune.html).
If you want tune Linux kernel by docker, you must ensure your docker is latest version (>=1.12).
```bash
docker run -d --name emqx -p 18083:18083 -p 1883:1883 \
--sysctl fs.file-max=2097152 \
--sysctl fs.nr_open=2097152 \
@ -312,7 +316,6 @@ docker run -d --name emqx -p 18083:18083 -p 1883:1883 \
--sysctl net.ipv4.tcp_max_tw_buckets=1048576 \
--sysctl net.ipv4.tcp_fin_timeout=15 \
emqx/emqx:latest
```
> REMEMBER: DO NOT RUN EMQX DOCKER PRIVILEGED OR MOUNT SYSTEM PROC IN CONTAINER TO TUNE LINUX KERNEL, IT IS UNSAFE.

View File

@ -24,6 +24,7 @@ base64 --decode > "${PKSC12_FILE}" <<<"${APPLE_DEVELOPER_ID_BUNDLE}"
KEYCHAIN='emqx.keychain-db'
KEYCHAIN_PASSWORD="$(openssl rand -base64 32)"
security delete-keychain "${KEYCHAIN}" 2>/dev/null || true
security create-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}"
security set-keychain-settings -lut 21600 "${KEYCHAIN}"
security unlock-keychain -p "${KEYCHAIN_PASSWORD}" "${KEYCHAIN}"

35
scripts/rel/check-chart-vsn.sh Executable file
View File

@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
if [ "${DEBUG:-}" = 1 ]; then
set -x
fi
# ensure dir
cd -P -- "$(dirname -- "$0")/../.."
PROFILE="$1"
CHART_FILE="deploy/charts/${PROFILE}/Chart.yaml"
if [ ! -f "$CHART_FILE" ]; then
echo "Chart file $CHART_FILE is not found"
echo "Current working dir: $(pwd)"
exit 1
fi
CHART_VSN="$(grep -oE '^version:.*' "$CHART_FILE" | cut -d ':' -f 2 | tr -d ' ')"
APP_VSN="$(grep -oE '^appVersion:.*' "$CHART_FILE" | cut -d ':' -f 2 | tr -d ' ')"
if [ "$CHART_VSN" != "$APP_VSN" ]; then
echo "Chart version and app version mismatch in $CHART_FILE"
exit 2
fi
PKG_VSN="$(./pkg-vsn.sh "$PROFILE" | cut -d '-' -f 1)"
if [ "$CHART_VSN" != "$PKG_VSN" ]; then
echo "Chart version in $CHART_FILE is not in sync with release version."
echo "Chart version: $CHART_VSN"
echo "Release version: $PKG_VSN"
exit 3
fi

214
scripts/rel/cut.sh Executable file
View File

@ -0,0 +1,214 @@
#!/usr/bin/env bash
## cut a new 5.x release for EMQX (opensource or enterprise).
set -euo pipefail
if [ "${DEBUG:-}" = 1 ]; then
set -x
fi
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
usage() {
cat <<EOF
$0 RELEASE_GIT_TAG [option]
RELEASE_GIT_TAG is a 'v*' or 'e*' tag for example:
v5.0.12
e5.0.0-beta.6
options:
-h|--help: Print this usage.
-b|--base: Specify the current release base branch, can be one of
release-50
NOTE: this option should be used when --dryrun.
--dryrun: Do not actually create the git tag.
--skip-appup: Skip checking appup
Useful when you are sure that appup is already updated'
NOTE: For 5.0 series the current working branch must be 'release-50' for opensource edition
and 'release-e50' for enterprise edition.
--.--[ master ]---------------------------.-----------.---
\\ /
\`---[release-50]----(v5.0.12 | e5.0.0)
EOF
}
logerr() {
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logmsg() {
echo "INFO: $1"
}
TAG="${1:-}"
case "$TAG" in
v*)
TAG_PREFIX='v'
PROFILE='emqx'
SKIP_APPUP='yes'
;;
e*)
TAG_PREFIX='e'
PROFILE='emqx-enterprise'
SKIP_APPUP='no'
;;
-h|--help)
usage
exit 0
;;
*)
logerr "Unknown version tag $TAG"
usage
exit 1
;;
esac
shift 1
DRYRUN='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
usage
exit 0
;;
--skip-appup)
shift
SKIP_APPUP='yes'
;;
--dryrun)
shift
DRYRUN='yes'
;;
-b|--base)
BASE_BR="${2:-}"
if [ -z "${BASE_BR}" ]; then
logerr "Must specify which base branch"
exit 1
fi
shift 2
;;
*)
logerr "Unknown option $1"
exit 1
;;
esac
done
rel_branch() {
local tag="$1"
case "$tag" in
v5.0.*)
echo 'release-50'
;;
e5.0.*)
echo 'release-50'
;;
*)
logerr "Unsupported version tag $TAG"
exit 1
;;
esac
}
## Ensure the current work branch
assert_work_branch() {
local tag="$1"
local release_branch
release_branch="$(rel_branch "$tag")"
local base_branch
base_branch="${BASE_BR:-$(git branch --show-current)}"
if [ "$base_branch" != "$release_branch" ]; then
logerr "Base branch: $base_branch"
logerr "Relase tag must be on the release branch: $release_branch"
logerr "or must use -b|--base option to specify which release branch is current branch based on"
exit 1
fi
}
assert_work_branch "$TAG"
## Ensure no dirty changes
assert_not_dirty() {
local diff
diff="$(git diff --name-only)"
if [ -n "$diff" ]; then
logerr "Git status is not clean? Changed files:"
logerr "$diff"
exit 1
fi
}
assert_not_dirty
## Assert that the tag is not already created
assert_tag_absent() {
local tag="$1"
## Fail if the tag already exists
EXISTING="$(git tag --list "$tag")"
if [ -n "$EXISTING" ]; then
logerr "$tag already released?"
logerr 'This script refuse to force re-tag.'
logerr 'If re-tag is intended, you must first delete the tag from both local and remote'
exit 1
fi
}
assert_tag_absent "$TAG"
PKG_VSN=$(./pkg-vsn.sh "$PROFILE")
## Assert package version is updated to the tag which is being created
assert_release_version() {
local tag="$1"
# shellcheck disable=SC2001
pkg_vsn="$(echo "$PKG_VSN" | sed 's/-g[0-9a-f]\{8\}$//g')"
if [ "${TAG_PREFIX}${pkg_vsn}" != "${tag}" ]; then
logerr "The release version ($pkg_vsn) is different from the desired git tag."
logerr "Update the release version in emqx_release.hrl"
exit 1
fi
}
assert_release_version "$TAG"
## Check if all upstream branches are merged
if [ -z "${BASE_BR:-}" ]; then
./scripts/rel/sync-remotes.sh
else
./scripts/rel/sync-remotes.sh --base "$BASE_BR"
fi
## Check if the Chart versions are in sync
./scripts/rel/check-chart-vsn.sh "$PROFILE"
## Check if app versions are bumped
./scripts/apps-version-check.sh
## Ensure appup files are updated
if [ "$SKIP_APPUP" = 'no' ]; then
logmsg "Checking appups"
./scripts/update-appup.sh "$PROFILE" --check
else
logmsg "Skipped checking appup updates"
fi
## Ensure relup paths are updated
## TODO: add relup path db
#./scripts/relup-base-vsns.escript check-vsn-db "$PKG_VSN" "$RELUP_PATHS"
## Run some additional checks (e.g. some for enterprise edition only)
CHECKS_DIR="./scripts/rel/checks"
if [ -d "${CHECKS_DIR}" ]; then
CHECKS="$(find "${CHECKS_DIR}" -name "*.sh" -print0 2>/dev/null | xargs -0)"
for c in $CHECKS; do
logmsg "Executing $c"
$c
done
fi
if [ "$DRYRUN" = 'yes' ]; then
logmsg "Release tag is ready to be created with command: git tag $TAG"
else
git tag "$TAG"
logmsg "$TAG is created OK."
fi

156
scripts/rel/sync-remotes.sh Executable file
View File

@ -0,0 +1,156 @@
#!/usr/bin/env bash
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
BASE_BRANCHES=( 'release-50' 'master' )
usage() {
cat <<EOF
$0 [option]
options:
-h|--help:
This script works on one of the branches listed in the -b|--base option below.
It tries to merge (by default with --ff-only option)
upstreams branches for the current working branch.
The uppstream branch of the current branch are as below:
* release-50: [] # no upstream for 5.0 opensource edition
* master: [release-50] # sync release-50 to master
-b|--base:
The base branch of current working branch if currently is not
on one of the following branches.
${BASE_BRANCHES[@]}
-i|--interactive:
With this option, the script will try to merge upstream
branches to local working branch interactively.
That is, there will be git prompts to edit commit messages etc.
Without this option, the script executes 'git merge' command
with '--ff-only' option which conveniently pulls remote
updates if there is any, and fails when fast-forward is not possible
EOF
}
logerr() {
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logwarn() {
echo "$(tput setaf 3)WARNING: $1$(tput sgr0)"
}
logmsg() {
echo "INFO: $1"
}
INTERACTIVE='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
usage
exit 0
;;
-i|--interactive)
shift
INTERACTIVE='yes'
;;
-b|--base)
shift
BASE_BRANCH="$1"
shift
;;
*)
logerr "Unknown option $1"
exit 1
;;
esac
done
CURRENT_BRANCH="$(git branch --show-current)"
BASE_BRANCH="${BASE_BRANCH:-${CURRENT_BRANCH}}"
## check if arg1 is one of the elements in arg2-N
is_element() {
local e match="$1"
shift
for e in "${@}"; do
if [ "$e" = "$match" ]; then
return 0
fi
done
return 1
}
if ! is_element "$BASE_BRANCH" "${BASE_BRANCHES[@]}"; then
logerr "Cannot work with branch $BASE_BRANCH"
logerr "The base branch must be one of: ${BASE_BRANCHES[*]}"
logerr "Change work branch to one of the above."
logerr "OR: use -b|--base to specify from which base branch is current working branch created"
exit 1
fi
## Find git remotes to fetch from.
##
## NOTE: For enterprise, the opensource repo must be added as a remote.
## Because not all changes in opensource repo are synced to enterprise repo immediately.
##
## NOTE: grep -v enterprise here, but why not to match on full repo name 'emqx/emqx.git'?
## It's because the git remote does not always end with .git
GIT_REMOTE_CE="$(git remote -v | grep 'emqx/emqx' | grep -v enterprise | grep fetch | head -1 | awk '{print $1}' || true)"
if [ -z "$GIT_REMOTE_CE" ]; then
logerr "Cannot find git remote for emqx/emqx"
exit 1
fi
REMOTES=( "${GIT_REMOTE_CE}" )
## Fetch the remotes
for remote in "${REMOTES[@]}"; do
logwarn "Fetching from remote=${remote} (force tag sync)."
git fetch "$remote" --tags --force
done
logmsg 'Fetched all remotes'
if [ "$INTERACTIVE" = 'yes' ]; then
MERGE_OPTS=''
else
## Using --ff-only to *check* if the remote is already merged
## Also conveniently merged it in case it's *not* merged but can be fast-forwarded
## Alternative is to check with 'git merge-base'
MERGE_OPTS='--ff-only'
fi
## Get the git remote reference of the given 'release-' or 'main-' branch
remote_ref() {
local branch="$1"
echo -n "${GIT_REMOTE_CE}/${branch} "
}
remote_refs() {
local br
for br in "${@}"; do
remote_ref "$br"
done
}
## Get upstream branches of the given branch
upstream_branches() {
local base="$1"
case "$base" in
release-50)
remote_ref "$base"
;;
master)
remote_refs "$base" 'release-50'
;;
esac
}
for remote_ref in $(upstream_branches "$BASE_BRANCH"); do
logmsg "Merging $remote_ref"
git merge $MERGE_OPTS "$remote_ref"
done