Merge branch 'main-v4.3' into bridge_config_topic_node_template_43

This commit is contained in:
gsychev 2022-02-25 10:05:36 +00:00 committed by GitHub
commit 19fcd6a4f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
115 changed files with 1173 additions and 722 deletions

View File

@ -18,8 +18,9 @@
?>
[shell emqx]
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
!cd $PACKAGE_PATH
!unzip -q -o $PROFILE-ubuntu20.04-$(echo $OLD_VSN | sed -r 's/[v|e]//g')-amd64.zip
!unzip -q -o $PROFILE-ubuntu20.04-$${OLD_VSN}-amd64.zip
?SH-PROMPT
!cd emqx
@ -30,6 +31,7 @@
?SH-PROMPT
[shell emqx2]
!OLD_VSN=$(echo $OLD_VSN | sed -r 's/[v|e]//g')
!cd $PACKAGE_PATH
!cp -f $ONE_MORE_EMQX_PATH/one_more_$(echo $PROFILE | sed 's/-/_/g').sh .
!./one_more_$(echo $PROFILE | sed 's/-/_/g').sh emqx2
@ -82,6 +84,27 @@
!cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/
## upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
!./bin/emqx versions |grep permanent
?(.*)$VSN
?SH-PROMPT
## downgrade to the old version
!./bin/emqx install $${OLD_VSN}
?Made release permanent:.*
?SH-PROMPT
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
?SH-PROMPT:
!echo ==$$?==
?^==0==
?SH-PROMPT:
## again, upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
@ -107,6 +130,27 @@
!cp -f ../$PROFILE-ubuntu20.04-$VSN-amd64.zip releases/
## upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
!./bin/emqx versions |grep permanent
?(.*)$VSN
?SH-PROMPT
## downgrade to the old version
!./bin/emqx install $${OLD_VSN}
?Made release permanent:.*
?SH-PROMPT
!./bin/emqx versions |grep permanent | grep -qs "$${OLD_VSN}"
?SH-PROMPT:
!echo ==$$?==
?^==0==
?SH-PROMPT:
## again, upgrade to the new version
!./bin/emqx install $VSN
?Made release permanent: "$VSN"
?SH-PROMPT
@ -136,17 +180,20 @@
!./bin/emqx_ctl broker metrics | grep "messages.publish"
???SH-PROMPT
## We don't guarantee not to lose a single message!
## So even if we received 290~300 messages, we consider it as success
[shell bench]
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].metrics[] | select(.node==\"emqx@127.0.0.1\").matched"
?300
?(29[0-9])|(300)
?SH-PROMPT
!curl --user admin:public --silent --show-error http://localhost:8081/api/v4/rules | jq -M --raw-output ".data[0].actions[0].metrics[] | select(.node==\"emqx@127.0.0.1\").success"
?300
?(29[0-9])|(300)
?SH-PROMPT
## The /counter API is provided by .ci/fvt_test/http_server
!curl http://127.0.0.1:8080/counter
???{"data":300,"code":0}
?\{"data":(29[0-9])|(300),"code":0\}
?SH-PROMPT
[shell emqx2]

View File

@ -1,6 +1,13 @@
name: Cross build packages
concurrency:
group: build-${{ github.event_name }}-${{ github.ref }}
cancel-in-progress: true
on:
push:
branches:
- 'main-v4.**'
schedule:
- cron: '0 */6 * * *'
release:
@ -15,7 +22,6 @@ jobs:
outputs:
profiles: ${{ steps.set_profile.outputs.profiles}}
old_vsns: ${{ steps.set_profile.outputs.old_vsns}}
steps:
- uses: actions/checkout@v2
@ -28,12 +34,8 @@ jobs:
run: |
cd source
if make emqx-ee --dry-run > /dev/null 2>&1; then
old_vsns="$(./scripts/relup-base-vsns.sh enterprise | xargs)"
echo "::set-output name=old_vsns::$old_vsns"
echo "::set-output name=profiles::[\"emqx-ee\"]"
else
old_vsns="$(./scripts/relup-base-vsns.sh community | xargs)"
echo "::set-output name=old_vsns::$old_vsns"
echo "::set-output name=profiles::[\"emqx\", \"emqx-edge\"]"
fi
- name: get_all_deps
@ -216,6 +218,7 @@ jobs:
needs: prepare
strategy:
fail-fast: false
matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
arch:
@ -269,32 +272,6 @@ jobs:
path: .
- name: unzip source code
run: unzip -q source.zip
- name: downloads old emqx zip packages
env:
PROFILE: ${{ matrix.profile }}
ARCH: ${{ matrix.arch }}
SYSTEM: ${{ matrix.os }}
OLD_VSNS: ${{ needs.prepare.outputs.old_vsns }}
run: |
set -e -x -u
broker=$PROFILE
if [ $PROFILE = "emqx" ];then
broker="emqx-ce"
fi
if [ ! -z "$(echo $SYSTEM | grep -oE 'raspbian')" ]; then
export ARCH="arm"
fi
mkdir -p source/_upgrade_base
cd source/_upgrade_base
old_vsns=($(echo $OLD_VSNS | tr ' ' ' '))
for tag in ${old_vsns[@]}; do
if [ ! -z "$(echo $(curl -I -m 10 -o /dev/null -s -w %{http_code} https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip) | grep -oE "^[23]+")" ];then
wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip
wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$broker/$tag/$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256
echo "$(cat $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip.sha256) $PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip" | sha256sum -c || exit 1
fi
done
- name: build emqx packages
env:
ERL_OTP: erl23.2.7.2-emqx-3
@ -342,6 +319,7 @@ jobs:
needs: prepare
strategy:
fail-fast: false
matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}}
registry:
@ -439,6 +417,7 @@ jobs:
needs: [prepare, mac, linux, docker]
strategy:
fail-fast: false
matrix:
profile: ${{fromJSON(needs.prepare.outputs.profiles)}}

View File

@ -1,24 +0,0 @@
name: Code style check
on: [pull_request]
jobs:
build:
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 1000
- name: Set git token
if: endsWith(github.repository, 'enterprise')
run: |
echo "https://ci%40emqx.io:${{ secrets.CI_GIT_TOKEN }}@github.com" > $HOME/.git-credentials
git config --global credential.helper store
- name: Run elvis check
run: |
set -e
if [ -f EMQX_ENTERPRISE ]; then
./scripts/elvis-check.sh $GITHUB_BASE_REF emqx-enterprise
else
./scripts/elvis-check.sh $GITHUB_BASE_REF emqx
fi

View File

@ -239,6 +239,7 @@ jobs:
name: Checkout
with:
path: emqx
fetch-depth: 0
- name: Prepare credentials
run: |
if [ "$PROFILE" = "emqx-ee" ]; then
@ -246,15 +247,6 @@ jobs:
git config --global credential.helper store
echo "${{ secrets.CI_GIT_TOKEN }}" >> emqx/scripts/git-token
fi
- name: Download bases
run: |
set -e -x -u
mkdir -p emqx/_upgrade_base
cd emqx/_upgrade_base
old_vsns=($(echo $OLD_VSNS | tr ' ' ' '))
for old_vsn in ${old_vsns[@]}; do
wget --no-verbose https://s3-us-west-2.amazonaws.com/packages.emqx/$BROKER/$old_vsn/$PROFILE-ubuntu20.04-${old_vsn#[e|v]}-amd64.zip
done
- name: Build emqx
run: make -C emqx ${PROFILE}-zip
- uses: actions/upload-artifact@v2

View File

@ -12,17 +12,30 @@ File format:
## v4.3.13
### Important changes
* For docker image, /opt/emqx/etc has been removed from the VOLUME list,
this made it easier for the users to rebuild image on top with changed configs.
### Enhancements
* CLI `emqx_ctl pem_cache clean` to force purge x509 certificate cache,
to force an immediate reload of all certificates after the files are updated on disk.
* Refactor the ExProto so that anonymous clients can also be displayed on the dashboard [#6983]
* Force shutdown of processe that cannot answer takeover event [#7026]
* `topic` parameter in bridge configuration can have `${node}` substitution (just like in `clientid` parameter)
### Bug fixes
* Fix the `{error,eexist}` error when do release upgrade again if last run failed. [#7121]
* Fix case where publishing to a non-existent topic alias would crash the connection [#6979]
* Fix HTTP-API 500 error on querying the lwm2m client list on the another node [#7009]
* Fix the ExProto connection registry is not released after the client process abnormally exits [#6983]
* Fix Server-KeepAlive wrongly applied on MQTT v3.0/v3.1 [#7085]
* Fix Stomp client can not trigger `$event/client_connection` message [#7096]
* Fix system memory false alarm at boot
* Fix the MQTT-SN message replay when the topic is not registered to the client [#6970]
## v4.3.12
### Important changes

View File

@ -3,14 +3,17 @@ REBAR_VERSION = 3.14.3-emqx-8
REBAR = $(CURDIR)/rebar3
BUILD = $(CURDIR)/build
SCRIPTS = $(CURDIR)/scripts
export EMQX_RELUP ?= true
export EMQX_DEFAULT_BUILDER = emqx/build-env:erl23.2.7.2-emqx-3-alpine
export EMQX_DEFAULT_RUNNER = alpine:3.12
export PKG_VSN ?= $(shell $(CURDIR)/pkg-vsn.sh)
export EMQX_DESC ?= EMQ X
export EMQX_CE_DASHBOARD_VERSION ?= v4.3.5
export DOCKERFILE := deploy/docker/Dockerfile
ifeq ($(OS),Windows_NT)
export REBAR_COLOR=none
FIND=/usr/bin/find
else
FIND=find
endif
PROFILE ?= emqx
@ -90,8 +93,8 @@ $(PROFILES:%=clean-%):
@if [ -d _build/$(@:clean-%=%) ]; then \
rm rebar.lock \
rm -rf _build/$(@:clean-%=%)/rel; \
find _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
find _build/$(@:clean-%=%) -type l -delete; \
$(FIND) _build/$(@:clean-%=%) -name '*.beam' -o -name '*.so' -o -name '*.app' -o -name '*.appup' -o -name '*.o' -o -name '*.d' -type f | xargs rm -f; \
$(FIND) _build/$(@:clean-%=%) -type l -delete; \
fi
.PHONY: clean-all
@ -125,10 +128,19 @@ COMMON_DEPS := $(REBAR) get-dashboard $(CONF_SEGS)
$(REL_PROFILES:%=%-rel) $(PKG_PROFILES:%=%-rel): $(COMMON_DEPS)
@$(BUILD) $(subst -rel,,$(@)) rel
## download relup base packages
.PHONY: $(REL_PROFILES:%=%-relup-downloads)
define download-relup-packages
$1-relup-downloads:
@if [ "$${EMQX_RELUP}" = "true" ]; then $(CURDIR)/scripts/relup-base-packages.sh $1; fi
endef
ALL_ZIPS = $(REL_PROFILES)
$(foreach zt,$(ALL_ZIPS),$(eval $(call download-relup-packages,$(zt))))
## relup target is to create relup instructions
.PHONY: $(REL_PROFILES:%=%-relup)
define gen-relup-target
$1-relup: $(COMMON_DEPS)
$1-relup: $1-relup-downloads $(COMMON_DEPS)
@$(BUILD) $1 relup
endef
ALL_ZIPS = $(REL_PROFILES)

View File

@ -1,20 +1,34 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.3",[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
%% -*-: erlang -*-
{"4.3.4",
[
{<<"4.3.3">>, [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []}
]},
{<<"4.3.[1-2]">>, [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []},
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []},
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.3",[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[1-2]">>,
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_bridge_mqtt,brutal_purge,soft_purge,[]},
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{<<"4.3.3">>, [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []}
]},
{<<"4.3.[1-2]">>, [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []},
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
]},
{"4.3.0", [
{load_module, emqx_bridge_mqtt, brutal_purge, soft_purge, []},
{load_module, emqx_bridge_worker, brutal_purge, soft_purge, []},
{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.
{load_module, emqx_bridge_mqtt_actions, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]
}.

View File

@ -149,7 +149,7 @@ send(#{client_pid := ClientPid} = Conn, [Msg | Rest], PktIds) ->
{ok, PktId} ->
send(Conn, Rest, [PktId | PktIds]);
{error, Reason} ->
%% NOTE: There is no partial sucess of a batch and recover from the middle
%% NOTE: There is no partial success of a batch and recover from the middle
%% only to retry all messages in one batch
{error, Reason}
end.
@ -159,7 +159,7 @@ handle_puback(#{packet_id := PktId, reason_code := RC}, Parent)
RC =:= ?RC_NO_MATCHING_SUBSCRIBERS ->
Parent ! {batch_ack, PktId}, ok;
handle_puback(#{packet_id := PktId, reason_code := RC}, _Parent) ->
?LOG(warning, "Publish ~p to remote node falied, reason_code: ~p", [PktId, RC]).
?LOG(warning, "Publish ~p to remote node failed, reason_code: ~p", [PktId, RC]).
handle_publish(Msg, Mountpoint) ->
emqx_broker:publish(emqx_bridge_msg:to_broker_msg(Msg, Mountpoint)).

View File

@ -1,6 +1,6 @@
{application, emqx_exproto,
[{description, "EMQ X Extension for Protocol"},
{vsn, "4.3.5"}, %% 4.3.3 is used by ee
{vsn, "4.3.6"}, %% 4.3.3 is used by ee
{modules, []},
{registered, []},
{mod, {emqx_exproto_app, []}},

View File

@ -1,28 +1,24 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.4",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2",
{<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>,
{<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.4",
[{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{<<"4\\.3\\.[4-5]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{"4.3.2",
{<<"4\\.3\\.[2-3]">>,
[{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_channel,brutal_purge,soft_purge,[]}]},
{<<"4.3.[0-1]">>,
{<<"4\\.3\\.[0-1]">>,
[{load_module,emqx_exproto_gsvr,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_gcli,brutal_purge,soft_purge,[]},
{load_module,emqx_exproto_conn,brutal_purge,soft_purge,[]},

View File

@ -94,9 +94,6 @@
awaiting_rel_max
]).
-define(CHANMOCK(P), {exproto_anonymous_client, P}).
-define(CHAN_CONN_TAB, emqx_channel_conn).
%%--------------------------------------------------------------------
%% Info, Attrs and Caps
%%--------------------------------------------------------------------
@ -155,15 +152,14 @@ init(ConnInfo = #{socktype := Socktype,
Channel = #channel{gcli = #{channel => GRpcChann},
conninfo = NConnInfo,
clientinfo = ClientInfo,
conn_state = connecting,
conn_state = accepted,
timers = #{}
},
case emqx_hooks:run_fold('client.connect', [NConnInfo], #{}) of
{error, _Reason} ->
throw(nopermission);
_ ->
ConnMod = maps:get(conn_mod, NConnInfo),
true = ets:insert(?CHAN_CONN_TAB, {?CHANMOCK(self()), ConnMod}),
ok = register_the_anonymous_client(ClientInfo, NConnInfo),
Req = #{conninfo =>
peercert(Peercert,
#{socktype => socktype(Socktype),
@ -172,6 +168,22 @@ init(ConnInfo = #{socktype := Socktype,
try_dispatch(on_socket_created, wrap(Req), Channel)
end.
register_the_anonymous_client(ClientInfo, ConnInfo) ->
ClientId = maps:get(clientid, ClientInfo),
case emqx_cm:open_session(true, ClientInfo, ConnInfo) of
{ok, _} ->
?LOG(debug, "Registered an anonymous connection, "
"temporary clientid: ~s", [ClientId]),
emqx_logger:set_metadata_clientid(ClientId),
_ = self() ! {event, accepted},
ok;
{error, Reason} ->
throw({register_anonymous_error, Reason})
end.
unregister_the_anonymous_client(ClientId) ->
emqx_cm:unregister_channel(ClientId).
%% @private
peercert(NoSsl, ConnInfo) when NoSsl == nossl;
NoSsl == undefined ->
@ -274,15 +286,14 @@ handle_call(close, Channel) ->
handle_call({auth, ClientInfo, _Password}, Channel = #channel{conn_state = connected}) ->
?LOG(warning, "Duplicated authorized command, dropped ~p", [ClientInfo]),
{reply, {error, ?RESP_PERMISSION_DENY, <<"Duplicated authenticate command">>}, Channel};
handle_call({auth, ClientInfo0, Password},
handle_call({auth, RequestedClientInfo, Password},
Channel = #channel{conninfo = ConnInfo,
clientinfo = ClientInfo}) ->
ClientInfo1 = enrich_clientinfo(ClientInfo0, ClientInfo),
NConnInfo = enrich_conninfo(ClientInfo0, ConnInfo),
clientinfo = ClientInfo0}) ->
ClientInfo1 = enrich_clientinfo(RequestedClientInfo, ClientInfo0),
NConnInfo = enrich_conninfo(RequestedClientInfo, ConnInfo),
Channel1 = Channel#channel{conninfo = NConnInfo,
clientinfo = ClientInfo1},
#{clientid := ClientId, username := Username} = ClientInfo1,
case emqx_access_control:authenticate(ClientInfo1#{password => Password}) of
@ -292,9 +303,10 @@ handle_call({auth, ClientInfo0, Password},
emqx_metrics:inc('client.auth.anonymous'),
NClientInfo = maps:merge(ClientInfo1, AuthResult),
NChannel = Channel1#channel{clientinfo = NClientInfo},
clean_anonymous_clients(),
case emqx_cm:open_session(true, NClientInfo, NConnInfo) of
{ok, _Session} ->
AnonymousClientId = maps:get(clientid, ClientInfo0),
unregister_the_anonymous_client(AnonymousClientId),
?LOG(debug, "Client ~s (Username: '~s') authorized successfully!",
[ClientId, Username]),
{reply, ok, [{event, connected}], ensure_connected(NChannel)};
@ -354,6 +366,9 @@ handle_call({publish, Topic, Qos, Payload},
handle_call(kick, Channel) ->
{shutdown, kicked, ok, Channel};
handle_call(discard, Channel) ->
{shutdown, discarded, ok, Channel};
handle_call(Req, Channel) ->
?LOG(warning, "Unexpected call: ~p", [Req]),
{reply, {error, unexpected_call}, Channel}.
@ -406,16 +421,12 @@ handle_info(Info, Channel) ->
-spec(terminate(any(), channel()) -> channel()).
terminate(Reason, Channel) ->
clean_anonymous_clients(),
Req = #{reason => stringfy(Reason)},
try_dispatch(on_socket_closed, wrap(Req), Channel).
is_anonymous(#{anonymous := true}) -> true;
is_anonymous(_AuthResult) -> false.
clean_anonymous_clients() ->
ets:delete(?CHAN_CONN_TAB, ?CHANMOCK(self())).
packet_to_message(Topic, Qos, Payload,
#channel{
conninfo = #{proto_ver := ProtoVer},
@ -608,23 +619,32 @@ default_conninfo(ConnInfo) ->
username => undefined,
conn_props => #{},
connected => true,
proto_name => <<"exproto">>,
proto_ver => <<"1.0">>,
connected_at => erlang:system_time(millisecond),
keepalive => 0,
receive_maximum => 0,
expiry_interval => 0}.
default_clientinfo(#{peername := {PeerHost, _},
default_clientinfo(#{peername := {PeerHost, PeerPort},
sockname := {_, SockPort}}) ->
#{zone => external,
protocol => undefined,
protocol => exproto,
peerhost => PeerHost,
sockport => SockPort,
clientid => undefined,
clientid => anonymous_clientid(PeerHost, PeerPort),
username => undefined,
is_bridge => false,
is_superuser => false,
mountpoint => undefined}.
anonymous_clientid(PeerHost, PeerPort) ->
iolist_to_binary(
["exproto-anonymous-",
inet:ntoa(PeerHost), "-", integer_to_list(PeerPort),
"-", emqx_rule_id:gen()
]).
stringfy(Reason) ->
unicode:characters_to_binary((io_lib:format("~0p", [Reason]))).

View File

@ -439,7 +439,8 @@ handle_msg({close, Reason}, State) ->
?LOG(debug, "Force to close the socket due to ~p", [Reason]),
handle_info({sock_closed, Reason}, close_socket(State));
handle_msg({event, connected}, State = #state{channel = Channel}) ->
handle_msg({event, Event}, State = #state{channel = Channel})
when Event == connected; Event == accepted ->
ClientId = emqx_exproto_channel:info(clientid, Channel),
emqx_cm:insert_channel_info(ClientId, info(State), stats(State));

View File

@ -1,6 +1,6 @@
{application, emqx_management,
[{description, "EMQ X Management API and CLI"},
{vsn, "4.3.11"}, % strict semver, bump manually!
{vsn, "4.3.12"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel,stdlib,minirest]},

View File

@ -1,13 +1,13 @@
%% -*- mode: erlang -*-
{VSN,
[ {<<"4\\.3\\.[0-9]+">>,
[ {<<"4\\.3\\.([0-9]|1[0])">>,
[ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management}
]},
{<<".*">>, []}
],
[ {<<"4\\.3\\.[0-9]+">>,
[ {<<"4\\.3\\.([0-9]|1[0])">>,
[ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management}

View File

@ -123,8 +123,6 @@
-define(MAX_ROW_LIMIT, 10000).
-define(APP, emqx_management).
-elvis([{elvis_style, god_modules, disable}]).
%%--------------------------------------------------------------------
%% Node Info
%%--------------------------------------------------------------------

View File

@ -271,6 +271,7 @@ format_channel_info({_Key, Info, Stats0}) ->
SessCreated = maps:get(created_at, Session, maps:get(connected_at, ConnInfo)),
Connected = case maps:get(conn_state, Info, connected) of
connected -> true;
accepted -> true; %% for exproto anonymous clients
_ -> false
end,
NStats = Stats#{max_subscriptions => maps:get(subscriptions_max, Stats, 0),

View File

@ -1,5 +1,5 @@
##====================================================================
## Rule Engine for EMQ X R4.0
## EMQX Rule Engine
##====================================================================
rule_engine.ignore_sys_message = on

View File

@ -1,7 +1,8 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.6",
[{update,emqx_rule_metrics,{advanced,["4.3.6"]}},
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
@ -58,7 +59,8 @@
{load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.6",
[{update,emqx_rule_metrics,{advanced,["4.3.6"]}},
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{update,emqx_rule_metrics,{advanced,["4.3.6"]}},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},

View File

@ -1,6 +1,6 @@
{application, emqx_sn,
[{description, "EMQ X MQTT-SN Plugin"},
{vsn, "4.3.5"}, % strict semver, bump manually!
{vsn, "4.3.6"}, % strict semver, bump manually!
{modules, []},
{registered, []},
{applications, [kernel,stdlib,esockd]},

View File

@ -1,29 +1,25 @@
%% -*- mode: erlang -*-
{VSN,
[
{"4.3.4",[
{<<"4\\.3\\.[4-5]">>,[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.2", [
{<<"4.3.[2-3]">>,[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}
],
[
{"4.3.4",[
{<<"4\\.3\\.[4-5]">>,[
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.3",[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{"4.3.2", [
{<<"4.3.[2-3]">>,[
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]}
]},
{<<"4\\.3\\.[0-1]">>, [{restart_application,emqx_sn}]}

View File

@ -268,40 +268,81 @@ message_type(16#1d) ->
message_type(Type) ->
io_lib:format("Unknown Type ~p", [Type]).
format(?SN_CONNECT_MSG(Flags, ProtocolId, Duration, ClientId)) ->
#mqtt_sn_flags{
will = Will,
clean_start = CleanStart} = Flags,
io_lib:format("SN_CONNECT(W~w, C~w, ProtocolId=~w, Duration=~w, "
"ClientId=~s)",
[bool(Will), bool(CleanStart),
ProtocolId, Duration, ClientId]);
format(?SN_CONNACK_MSG(ReturnCode)) ->
io_lib:format("SN_CONNACK(ReturnCode=~w)", [ReturnCode]);
format(?SN_WILLTOPICREQ_MSG()) ->
"SN_WILLTOPICREQ()";
format(?SN_WILLTOPIC_MSG(Flags, Topic)) ->
#mqtt_sn_flags{
qos = QoS,
retain = Retain} = Flags,
io_lib:format("SN_WILLTOPIC(Q~w, R~w, Topic=~s)",
[QoS, bool(Retain), Topic]);
format(?SN_WILLTOPIC_EMPTY_MSG) ->
"SN_WILLTOPIC(_)";
format(?SN_WILLMSGREQ_MSG()) ->
"SN_WILLMSGREQ()";
format(?SN_WILLMSG_MSG(Msg)) ->
io_lib:format("SN_WILLMSG_MSG(Msg=~p)", [Msg]);
format(?SN_PUBLISH_MSG(Flags, TopicId, MsgId, Data)) ->
io_lib:format("mqtt_sn_message SN_PUBLISH, ~s, TopicId=~w, MsgId=~w, Payload=~w",
[format_flag(Flags), TopicId, MsgId, Data]);
format(?SN_PUBACK_MSG(Flags, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_PUBACK, ~s, MsgId=~w, ReturnCode=~w",
[format_flag(Flags), MsgId, ReturnCode]);
#mqtt_sn_flags{
dup = Dup,
qos = QoS,
retain = Retain,
topic_id_type = TopicIdType} = Flags,
io_lib:format("SN_PUBLISH(D~w, Q~w, R~w, TopicIdType=~w, TopicId=~w, "
"MsgId=~w, Payload=~p)",
[bool(Dup), QoS, bool(Retain),
TopicIdType, TopicId, MsgId, Data]);
format(?SN_PUBACK_MSG(TopicId, MsgId, ReturnCode)) ->
io_lib:format("SN_PUBACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
[TopicId, MsgId, ReturnCode]);
format(?SN_PUBREC_MSG(?SN_PUBCOMP, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBCOMP, MsgId=~w", [MsgId]);
io_lib:format("SN_PUBCOMP(MsgId=~w)", [MsgId]);
format(?SN_PUBREC_MSG(?SN_PUBREC, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBREC, MsgId=~w", [MsgId]);
io_lib:format("SN_PUBREC(MsgId=~w)", [MsgId]);
format(?SN_PUBREC_MSG(?SN_PUBREL, MsgId)) ->
io_lib:format("mqtt_sn_message SN_PUBREL, MsgId=~w", [MsgId]);
io_lib:format("SN_PUBREL(MsgId=~w)", [MsgId]);
format(?SN_SUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_SUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
[format_flag(Flags), Msgid, Topic]);
#mqtt_sn_flags{
dup = Dup,
qos = QoS,
topic_id_type = TopicIdType} = Flags,
io_lib:format("SN_SUBSCRIBE(D~w, Q~w, TopicIdType=~w, MsgId=~w, "
"TopicId=~w)",
[bool(Dup), QoS, TopicIdType, Msgid, Topic]);
format(?SN_SUBACK_MSG(Flags, TopicId, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_SUBACK, ~s, MsgId=~w, TopicId=~w, ReturnCode=~w",
[format_flag(Flags), MsgId, TopicId, ReturnCode]);
#mqtt_sn_flags{qos = QoS} = Flags,
io_lib:format("SN_SUBACK(GrantedQoS=~w, MsgId=~w, TopicId=~w, "
"ReturnCode=~w)",
[QoS, MsgId, TopicId, ReturnCode]);
format(?SN_UNSUBSCRIBE_MSG(Flags, Msgid, Topic)) ->
io_lib:format("mqtt_sn_message SN_UNSUBSCRIBE, ~s, MsgId=~w, TopicId=~w",
[format_flag(Flags), Msgid, Topic]);
#mqtt_sn_flags{topic_id_type = TopicIdType} = Flags,
io_lib:format("SN_UNSUBSCRIBE(TopicIdType=~s, MsgId=~w, TopicId=~w)",
[TopicIdType, Msgid, Topic]);
format(?SN_UNSUBACK_MSG(MsgId)) ->
io_lib:format("mqtt_sn_message SN_UNSUBACK, MsgId=~w", [MsgId]);
io_lib:format("SN_UNSUBACK(MsgId=~w)", [MsgId]);
format(?SN_REGISTER_MSG(TopicId, MsgId, TopicName)) ->
io_lib:format("mqtt_sn_message SN_REGISTER, TopicId=~w, MsgId=~w, TopicName=~w",
io_lib:format("SN_REGISTER(TopicId=~w, MsgId=~w, TopicName=~s)",
[TopicId, MsgId, TopicName]);
format(?SN_REGACK_MSG(TopicId, MsgId, ReturnCode)) ->
io_lib:format("mqtt_sn_message SN_REGACK, TopicId=~w, MsgId=~w, ReturnCode=~w",
io_lib:format("SN_REGACK(TopicId=~w, MsgId=~w, ReturnCode=~w)",
[TopicId, MsgId, ReturnCode]);
format(?SN_PINGREQ_MSG(ClientId)) ->
io_lib:format("SN_PINGREQ(ClientId=~s)", [ClientId]);
format(?SN_PINGRESP_MSG()) ->
"SN_PINGREQ()";
format(?SN_DISCONNECT_MSG(Duration)) ->
io_lib:format("SN_DISCONNECT(Duration=~s)", [Duration]);
format(#mqtt_sn_message{type = Type, variable = Var}) ->
io_lib:format("mqtt_sn_message type=~s, Var=~w", [emqx_sn_frame:message_type(Type), Var]).
format_flag(#mqtt_sn_flags{dup = Dup, qos = QoS, retain = Retain, will = Will, clean_start = CleanStart, topic_id_type = TopicType}) ->
io_lib:format("mqtt_sn_flags{dup=~p, qos=~p, retain=~p, will=~p, clean_session=~p, topic_id_type=~p}",
[Dup, QoS, Retain, Will, CleanStart, TopicType]);
format_flag(_Flag) -> "invalid flag".
io_lib:format("mqtt_sn_message(type=~s, Var=~w)",
[emqx_sn_frame:message_type(Type), Var]).

View File

@ -94,6 +94,8 @@
idle_timeout :: integer(),
enable_qos3 = false :: boolean(),
has_pending_pingresp = false :: boolean(),
%% Store all qos0 messages for waiting REGACK
%% Note: QoS1/QoS2 messages will kept inflight queue
pending_topic_ids = #{} :: pending_msgs()
}).
@ -490,7 +492,7 @@ handle_event({call, From}, Req, _StateName, State) ->
{reply, Reply, NState} ->
gen_server:reply(From, Reply),
{keep_state, NState};
{stop, Reason, Reply, NState} ->
{shutdown, Reason, Reply, NState} ->
State0 = case NState#state.sockstate of
running ->
send_message(?SN_DISCONNECT_MSG(undefined), NState);
@ -518,10 +520,9 @@ handle_event(info, {datagram, SockPid, Data}, StateName,
end;
handle_event(info, {deliver, _Topic, Msg}, asleep,
State = #state{channel = Channel, pending_topic_ids = Pendings}) ->
State = #state{channel = Channel}) ->
% section 6.14, Support of sleeping clients
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p, pending_topic_ids: ~0p",
[Msg, Pendings]),
?LOG(debug, "enqueue downlink message in asleep state, msg: ~0p", [Msg]),
Session = emqx_session:enqueue(emqx_channel:info(clientinfo, Channel),
Msg, emqx_channel:get_session(Channel)),
{keep_state, State#state{channel = emqx_channel:set_session(Session, Channel)}};
@ -610,7 +611,7 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
{reply, Reply, NChannel} ->
{reply, Reply, State#state{channel = NChannel}};
{shutdown, Reason, Reply, NChannel} ->
stop(Reason, Reply, State#state{channel = NChannel})
{shutdown, Reason, Reply, State#state{channel = NChannel}}
end.
handle_info({sock_closed, Reason} = Info, State = #state{channel = Channel}) ->
@ -723,11 +724,19 @@ mqtt2sn(?PUBCOMP_PACKET(MsgId), _State) ->
mqtt2sn(?UNSUBACK_PACKET(MsgId), _State)->
?SN_UNSUBACK_MSG(MsgId);
mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channel}) ->
NewPacketId = if QoS =:= ?QOS_0 -> 0;
mqtt2sn(
#mqtt_packet{header = #mqtt_packet_header{
type = ?PUBLISH,
qos = QoS,
%dup = Dup,
retain = Retain},
variable = #mqtt_packet_publish{
topic_name = Topic,
packet_id = PacketId},
payload = Payload}, #state{clientid = ClientId}) ->
NPacketId = if QoS =:= ?QOS_0 -> 0;
true -> PacketId
end,
ClientId = emqx_channel:info(clientid, Channel),
{TopicIdType, TopicContent} = case emqx_sn_registry:lookup_topic_id(ClientId, Topic) of
{predef, PredefTopicId} ->
{?SN_PREDEFINED_TOPIC, PredefTopicId};
@ -737,8 +746,12 @@ mqtt2sn(?PUBLISH_PACKET(QoS, Topic, PacketId, Payload), #state{channel = Channe
{?SN_SHORT_TOPIC, Topic}
end,
Flags = #mqtt_sn_flags{qos = QoS, topic_id_type = TopicIdType},
?SN_PUBLISH_MSG(Flags, TopicContent, NewPacketId, Payload);
Flags = #mqtt_sn_flags{
%dup = Dup,
qos = QoS,
retain = Retain,
topic_id_type = TopicIdType},
?SN_PUBLISH_MSG(Flags, TopicContent, NPacketId, Payload);
mqtt2sn(?SUBACK_PACKET(MsgId, ReturnCodes), _State)->
% if success, suback is sent by handle_info({suback, MsgId, [GrantedQoS]}, ...)
@ -766,9 +779,10 @@ send_connack(State) ->
send_message(Msg = #mqtt_sn_message{type = Type},
State = #state{sockpid = SockPid, peername = Peername}) ->
?LOG(debug, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
?LOG(info, "SEND ~s~n", [emqx_sn_frame:format(Msg)]),
inc_outgoing_stats(Type),
Data = emqx_sn_frame:serialize(Msg),
?LOG(debug, "SEND ~0p", [Data]),
ok = emqx_metrics:inc('bytes.sent', iolist_size(Data)),
SockPid ! {datagram, Peername, Data},
State.
@ -793,13 +807,6 @@ stop(Reason, State) ->
maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, State}.
stop({shutdown, Reason}, Reply, State) ->
stop(Reason, Reply, State);
stop(Reason, Reply, State) ->
?LOG(stop_log_level(Reason), "stop due to ~p", [Reason]),
maybe_send_will_msg(Reason, State),
{stop, {shutdown, Reason}, Reply, State}.
maybe_send_will_msg(normal, _State) ->
ok;
maybe_send_will_msg(_Reason, State) ->
@ -825,6 +832,9 @@ do_connect(ClientId, CleanStart, WillFlag, Duration, State) ->
%% At any point in time a client may have only one QoS level 1 or 2 PUBLISH message
%% outstanding, i.e. it has to wait for the termination of this PUBLISH message exchange
%% before it could start a new level 1 or 2 transaction.
%%
%% FIXME: But we should have a re-try timer to re-send the inflight
%% qos1/qos2 message
OnlyOneInflight = #{'Receive-Maximum' => 1},
ConnPkt = #mqtt_packet_connect{clientid = ClientId,
clean_start = CleanStart,
@ -974,10 +984,15 @@ do_puback(TopicId, MsgId, ReturnCode, StateName,
undefined -> {keep_state, State};
TopicName ->
%% notice that this TopicName maybe normal or predefined,
%% involving the predefined topic name in register to enhance the gateway's robustness even inconsistent with MQTT-SN channels
{keep_state, send_register(TopicName, TopicId, MsgId, State)}
%% involving the predefined topic name in register to
%% enhance the gateway's robustness even inconsistent
%% with MQTT-SN channel
{keep_state,
send_register(TopicName, TopicId, MsgId, State)}
end;
_ ->
%% XXX: We need to handle others error code
%% 'Rejection: congestion'
?LOG(error, "CAN NOT handle PUBACK ReturnCode=~p", [ReturnCode]),
{keep_state, State}
end.
@ -1050,7 +1065,7 @@ handle_incoming(Packet, _StName, State) ->
channel_handle_in(Packet = ?PACKET(Type), #state{channel = Channel}) ->
_ = inc_incoming_stats(Type),
ok = emqx_metrics:inc_recv(Packet),
?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
?LOG(debug, "Transed-RECV ~s", [emqx_packet:format(Packet)]),
emqx_channel:handle_in(Packet, Channel).
handle_outgoing(Packets, State) when is_list(Packets) ->
@ -1064,7 +1079,9 @@ handle_outgoing(PubPkt = ?PUBLISH_PACKET(_, TopicName, _, _),
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:lookup_topic_id(ClientId, TopicName),
case (TopicId == undefined) andalso (byte_size(TopicName) =/= 2) of
true -> register_and_notify_client(PubPkt, State);
true ->
%% TODO: only one REGISTER inflight if qos=0??
register_and_notify_client(PubPkt, State);
false -> send_message(mqtt2sn(PubPkt, State), State)
end;
@ -1077,13 +1094,40 @@ cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State) ->
Msgs = maps:get(pending_topic_ids, Pendings, []),
Pendings#{TopicId => Msgs ++ [mqtt2sn(PubPkt, State)]}.
replay_no_reg_pending_publishes(TopicId, #state{pending_topic_ids = Pendings} = State0) ->
?LOG(debug, "replay non-registered publish message for topic-id: ~p, pendings: ~0p",
[TopicId, Pendings]),
replay_no_reg_pending_publishes(TopicId,
State0 = #state{
pending_topic_ids = Pendings}) ->
?LOG(debug, "replay non-registered qos0 publish message for "
"topic-id: ~p, pendings: ~0p", [TopicId, Pendings]),
State = lists:foldl(fun(Msg, State1) ->
send_message(Msg, State1)
end, State0, maps:get(TopicId, Pendings, [])),
State#state{pending_topic_ids = maps:remove(TopicId, Pendings)}.
NState = State#state{pending_topic_ids = maps:remove(TopicId, Pendings)},
case replay_inflight_messages(TopicId, State#state.channel) of
[] -> ok;
Outgoings ->
?LOG(debug, "replay non-registered qos1/qos2 publish message "
"for topic-id: ~0p, messages: ~0p",
[TopicId, Outgoings]),
handle_outgoing(Outgoings, NState)
end.
replay_inflight_messages(TopicId, Channel) ->
Inflight = emqx_session:info(inflight, emqx_channel:get_session(Channel)),
case emqx_inflight:to_list(Inflight) of
[] -> [];
[{PktId, {Msg, _Ts}}] -> %% Fixed inflight size 1
ClientId = emqx_channel:info(clientid, Channel),
ReplayTopic = emqx_sn_registry:lookup_topic(ClientId, TopicId),
case ReplayTopic =:= emqx_message:topic(Msg) of
false -> [];
true ->
NMsg = emqx_message:set_flag(dup, true, Msg),
[emqx_message:to_packet(PktId, NMsg)]
end
end.
register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) = PubPkt,
State = #state{pending_topic_ids = Pendings, channel = Channel}) ->
@ -1091,10 +1135,17 @@ register_and_notify_client(?PUBLISH_PACKET(QoS, TopicName, PacketId, Payload) =
#mqtt_packet{header = #mqtt_packet_header{dup = Dup, retain = Retain}} = PubPkt,
ClientId = emqx_channel:info(clientid, Channel),
TopicId = emqx_sn_registry:register_topic(ClientId, TopicName),
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, QoS=~p, "
"Retain=~p, MsgId=~p", [TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
NewPendings = cache_no_reg_publish_message(Pendings, TopicId, PubPkt, State),
send_register(TopicName, TopicId, MsgId, State#state{pending_topic_ids = NewPendings}).
?LOG(debug, "Register TopicId=~p, TopicName=~p, Payload=~p, Dup=~p, "
"QoS=~p,Retain=~p, MsgId=~p",
[TopicId, TopicName, Payload, Dup, QoS, Retain, MsgId]),
NPendings = case QoS == ?QOS_0 of
true ->
cache_no_reg_publish_message(
Pendings, TopicId, PubPkt, State);
_ -> Pendings
end,
send_register(TopicName, TopicId, MsgId,
State#state{pending_topic_ids = NPendings}).
message_id(undefined) ->
rand:uniform(16#FFFF);

View File

@ -819,6 +819,151 @@ t_publish_qos2_case03(_) ->
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_qos1_register_invalid_topic_id(_) ->
Dup = 0,
QoS = 1,
Retain = 0,
Will = 0,
CleanSession = 0,
MsgId = 1,
TopicId = ?MAX_PRED_TOPIC_ID + 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>),
?assertEqual(<<3, ?SN_CONNACK, 0>>, receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, QoS, <<"ab">>, MsgId),
?assertEqual(<<8, ?SN_SUBACK, Dup:1, QoS:2, Retain:1, Will:1, CleanSession:1,
?SN_NORMAL_TOPIC:2, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
Payload = <<"test-registration-inconsistent">>,
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"ab">>, Payload)),
?assertEqual(
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, MsgId:16, Payload/binary>>, receive_response(Socket)),
%% acked with ?SN_RC_INVALID_TOPIC_ID to
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_INVALID_TOPIC_ID),
?assertEqual(
{TopicId, MsgId},
check_register_msg_on_udp(<<"ab">>, receive_response(Socket))),
send_regack_msg(Socket, TopicId, MsgId + 1),
%% receive the replay message
?assertEqual(
<<(7 + byte_size(Payload)), ?SN_PUBLISH,
Dup:1, QoS:2, Retain:1,
Will:1, CleanSession:1, ?SN_NORMAL_TOPIC:2,
TopicId:16, (MsgId):16, Payload/binary>>, receive_response(Socket)),
send_disconnect_msg(Socket, undefined),
?assertEqual(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket).
t_delivery_takeover_and_re_register(_) ->
MsgId = 1,
{ok, Socket} = gen_udp:open(0, [binary]),
send_connect_msg(Socket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(Socket)),
send_subscribe_msg_normal_topic(Socket, ?QOS_1, <<"topic-a">>, MsgId+1),
<<_, ?SN_SUBACK, 2#00100000,
TopicIdA:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
send_subscribe_msg_normal_topic(Socket, ?QOS_2, <<"topic-b">>, MsgId+2),
<<_, ?SN_SUBACK, 2#01000000,
TopicIdB:16, _:16, ?SN_RC_ACCEPTED>> = receive_response(Socket),
_ = emqx:publish(
emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"test-a">>)),
_ = emqx:publish(
emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"test-b">>)),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgId1:16, "test-a">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdA, MsgId1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgId2:16, "test-b">> = receive_response(Socket),
send_puback_msg(Socket, TopicIdB, MsgId2, ?SN_RC_ACCEPTED),
send_disconnect_msg(Socket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(Socket)),
gen_udp:close(Socket),
%% offline messages will be queued into the MQTT-SN session
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_1, <<"topic-a">>, <<"m3">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m1">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m2">>)),
_ = emqx:publish(emqx_message:make(test, ?QOS_2, <<"topic-b">>, <<"m3">>)),
{ok, NSocket} = gen_udp:open(0, [binary]),
send_connect_msg(NSocket, <<"test">>, 0),
?assertMatch(<<_, ?SN_CONNACK, ?SN_RC_ACCEPTED>>,
receive_response(NSocket)),
%% qos1
%% received the resume messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdA, MsgIdA0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdA:16, RegMsgIdA:16, "topic-a">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdA, RegMsgIdA),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA1:16, "m1">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA1, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#00100000,
TopicIdA:16, MsgIdA3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdA, MsgIdA3, ?SN_RC_ACCEPTED),
%% qos2
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB0:16, "m1">> = receive_response(NSocket),
%% only one qos1/qos2 inflight
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_puback_msg(NSocket, TopicIdB, MsgIdB0, ?SN_RC_INVALID_TOPIC_ID),
%% recv register
<<_, ?SN_REGISTER,
TopicIdB:16, RegMsgIdB:16, "topic-b">> = receive_response(NSocket),
send_regack_msg(NSocket, TopicIdB, RegMsgIdB),
%% received the replay messages
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB1:16, "m1">> = receive_response(NSocket),
send_pubrec_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBREL, MsgIdB1:16>> = receive_response(NSocket),
send_pubcomp_msg(NSocket, MsgIdB1),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB2:16, "m2">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB2, ?SN_RC_ACCEPTED),
<<_, ?SN_PUBLISH, 2#01000000,
TopicIdB:16, MsgIdB3:16, "m3">> = receive_response(NSocket),
send_puback_msg(NSocket, TopicIdB, MsgIdB3, ?SN_RC_ACCEPTED),
%% no more messages
?assertEqual(udp_receive_timeout, receive_response(NSocket)),
send_disconnect_msg(NSocket, undefined),
?assertMatch(<<2, ?SN_DISCONNECT>>, receive_response(NSocket)),
gen_udp:close(NSocket).
t_will_case01(_) ->
QoS = 1,
Duration = 1,
@ -1591,13 +1736,16 @@ send_searchgw_msg(Socket) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, <<Length:8, MsgType:8, Radius:8>>).
send_connect_msg(Socket, ClientId) ->
send_connect_msg(Socket, ClientId, 1).
send_connect_msg(Socket, ClientId, CleanSession) when CleanSession == 0;
CleanSession == 1 ->
Length = 6 + byte_size(ClientId),
MsgType = ?SN_CONNECT,
Dup = 0,
QoS = 0,
Retain = 0,
Will = 0,
CleanSession = 1,
TopicIdType = 0,
ProtocolId = 1,
Duration = 10,
@ -1713,9 +1861,12 @@ send_publish_msg_short_topic(Socket, QoS, MsgId, TopicName, Data) ->
ok = gen_udp:send(Socket, ?HOST, ?PORT, PublishPacket).
send_puback_msg(Socket, TopicId, MsgId) ->
send_puback_msg(Socket, TopicId, MsgId, ?SN_RC_ACCEPTED).
send_puback_msg(Socket, TopicId, MsgId, Rc) ->
Length = 7,
MsgType = ?SN_PUBACK,
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, ?SN_RC_ACCEPTED:8>>,
PubAckPacket = <<Length:8, MsgType:8, TopicId:16, MsgId:16, Rc:8>>,
?LOG("send_puback_msg TopicId=~p, MsgId=~p", [TopicId, MsgId]),
ok = gen_udp:send(Socket, ?HOST, ?PORT, PubAckPacket).

View File

@ -1,6 +1,6 @@
{application, emqx_stomp,
[{description, "EMQ X Stomp Protocol Plugin"},
{vsn, "4.3.4"}, % strict semver, bump manually!
{vsn, "4.3.5"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_stomp_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,8 +1,15 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{"4.3.4",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
@ -12,14 +19,20 @@
[{restart_application,emqx_stomp},
{apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
{<<".*">>,[]}],
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{"4.3.4",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},
{"4.3.0",[{restart_application,emqx_stomp}]},
{<<".*">>,[]}]}.

View File

@ -91,8 +91,6 @@
-define(ENABLED(X), (X =/= undefined)).
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_stomp_connection]}}]).
-dialyzer({nowarn_function, [ ensure_stats_timer/2
]}).

View File

@ -108,8 +108,6 @@
, init/2
]}).
-elvis([{elvis_style, dont_repeat_yourself, disable}]).
-type(pstate() :: #pstate{}).
%% @doc Init protocol
@ -154,7 +152,7 @@ default_conninfo(ConnInfo) ->
clean_start => true,
clientid => undefined,
username => undefined,
conn_props => [],
conn_props => #{},
connected => false,
connected_at => undefined,
keepalive => undefined,
@ -816,4 +814,3 @@ interval(outgoing_timer, #pstate{heart_beats = HrtBt}) ->
emqx_stomp_heartbeat:interval(outgoing, HrtBt);
interval(clean_trans_timer, _) ->
?TRANS_TIMEOUT.

View File

@ -1,5 +1,5 @@
%%-*- mode: erlang -*-
%% EMQ X R3.0 config mapping
%% EMQX config mapping
{mapping, "web.hook.url", "emqx_web_hook.url", [
{datatype, string}

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.9"}, % strict semver, bump manually!
{vsn, "4.3.10"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -1,24 +1,34 @@
%% -*- mode: erlang -*-
{VSN,
[{<<"4\\.3\\.[0-2]">>,
[{<<"4\\.3\\.[0-2]$">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-8]">>,
{<<"4\\.3\\.[3-7]$">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[ %% nothing so far
]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>,
[{<<"4\\.3\\.[0-2]$">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[3-8]">>,
{<<"4\\.3\\.[3-7]$">>,
[{apply,{application,stop,[emqx_web_hook]}},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[ %% nothing so far
]},
{<<".*">>,[]}]}.

View File

@ -6,6 +6,8 @@
-define(TIMEOUT, 300000).
-define(INFO(Fmt,Args), io:format(Fmt++"~n",Args)).
-mode(compile).
main([Command0, DistInfoStr | CommandArgs]) ->
%% convert the distribution info arguments string to an erlang term
{ok, Tokens, _} = erl_scan:string(DistInfoStr ++ "."),
@ -210,15 +212,24 @@ find_and_link_release_package(Version, RelName) ->
ok = filelib:ensure_dir(filename:join([filename:dirname(ReleaseLink), "dummy"])),
%% create the symlink pointing to the full path name of the
%% release package we found
case file:make_symlink(filename:absname(Filename), ReleaseLink) of
ok ->
ok;
{error, eperm} -> % windows!
{ok,_} = file:copy(filename:absname(Filename), ReleaseLink)
end,
make_symlink_or_copy(filename:absname(Filename), ReleaseLink),
{Filename, ReleaseHandlerPackageLink}
end.
make_symlink_or_copy(Filename, ReleaseLink) ->
case file:make_symlink(Filename, ReleaseLink) of
ok -> ok;
{error, eexist} ->
?INFO("symlink ~p already exists, recreate it", [ReleaseLink]),
ok = file:delete(ReleaseLink),
make_symlink_or_copy(Filename, ReleaseLink);
{error, Reason} when Reason =:= eperm; Reason =:= enotsup ->
{ok, _} = file:copy(Filename, ReleaseLink);
{error, Reason} ->
?INFO("create symlink ~p failed", [ReleaseLink]),
error({Reason, ReleaseLink})
end.
unpack_zipballs(RelNameStr, Version) ->
{ok, Cwd} = file:get_cwd(),
GzFile = filename:absname(filename:join(["releases", RelNameStr ++ "-" ++ Version ++ ".tar.gz"])),
@ -365,7 +376,7 @@ start_distribution(TargetNode, NameTypeArg, Cookie) ->
make_script_node(Node) ->
[Name, Host] = string:tokens(atom_to_list(Node), "@"),
list_to_atom(lists:concat([Name, "_upgrader_", os:getpid(), "@", Host])).
list_to_atom(lists:concat(["remsh_", Name, "_upgrader_", os:getpid(), "@", Host])).
%% get name type from arg
get_name_type(NameTypeArg) ->

18
build
View File

@ -4,6 +4,9 @@
# arg1: profile, e.g. emqx | emqx-edge | emqx-pkg | emqx-edge-pkg
# arg2: artifact, e.g. rel | relup | zip | pkg
if [[ -n "$DEBUG" ]]; then
set -x
fi
set -euo pipefail
PROFILE="$1"
@ -45,6 +48,13 @@ if [ "$(uname -s)" = 'Linux' ]; then
esac
fi
if [ "${SYSTEM}" = 'windows' ]; then
# windows does not like the find
FIND="/usr/bin/find"
else
FIND='find'
fi
log() {
local msg="$1"
# rebar3 prints ===>, so we print ===<
@ -71,12 +81,12 @@ make_relup() {
tmp_dir="$(mktemp -d -t emqx.XXXXXXX)"
unzip -q "$zip" "emqx/releases/*" -d "$tmp_dir"
unzip -q "$zip" "emqx/lib/*" -d "$tmp_dir"
cp -r -n "$tmp_dir/emqx/releases"/* "$releases_dir"
cp -r -n "$tmp_dir/emqx/lib"/* "$lib_dir"
cp -r -n "$tmp_dir/emqx/releases"/* "$releases_dir" || true
cp -r -n "$tmp_dir/emqx/lib"/* "$lib_dir" || true
rm -rf "$tmp_dir"
fi
releases+=( "$base_vsn" )
done < <(find _upgrade_base -maxdepth 1 -name "*$PROFILE-$SYSTEM*-$ARCH.zip" -type f)
done < <("$FIND" _upgrade_base -maxdepth 1 -name "*$PROFILE-$SYSTEM*-$ARCH.zip" -type f)
fi
if [ ${#releases[@]} -eq 0 ]; then
log "No upgrade base found, relup ignored"
@ -96,7 +106,7 @@ cp_dyn_libs() {
mkdir -p "$target_dir"
while read -r so_file; do
cp -L "$so_file" "$target_dir/"
done < <(find "$rel_dir" -type f \( -name "*.so*" -o -name "beam.smp" \) -print0 \
done < <("$FIND" "$rel_dir" -type f \( -name "*.so*" -o -name "beam.smp" \) -print0 \
| xargs -0 ldd \
| grep -E '(libcrypto)|(libtinfo)' \
| awk '{print $3}' \

View File

@ -23,6 +23,8 @@ COPY . /emqx
ARG PKG_VSN
ARG EMQX_NAME=emqx
ENV EMQX_RELUP=false
RUN cd /emqx \
&& rm -rf _build/$EMQX_NAME/lib \
&& make $EMQX_NAME
@ -58,7 +60,8 @@ RUN chgrp -Rf emqx /opt/emqx && chmod -Rf g+w /opt/emqx \
USER emqx
VOLUME ["/opt/emqx/log", "/opt/emqx/data", "/opt/emqx/etc"]
## NOTE: /opt/emqx/etc is removed from the VOLUME list since 4.3.13
VOLUME ["/opt/emqx/log", "/opt/emqx/data"]
# emqx will occupy these port:
# - 1883 port for MQTT

View File

@ -1,47 +0,0 @@
%% -*-: erlang -*-
[
{
elvis,
[
{config,
[
#{dirs => ["src", "apps/**/src", "lib-ce/**/src", "lib-ee/**/src"],
filter => "*.erl",
ruleset => erl_files,
rules => [
{elvis_style, state_record_and_type, disable},
{elvis_style, no_common_caveats_call, #{}},
{elvis_style, no_debug_call, #{ debug_functions => [ {ct, pal}
, {ct, print}
]}},
{elvis_style, operator_spaces, #{rules => [{right, "|"},
{left, "|"},
{right, "||"},
{left, "||"}]}}
]
},
#{dirs => ["test", "apps/**/test", "lib-ce/**/src"],
filter => "*.erl",
rules => [
{elvis_text_style, line_length, #{ limit => 100
, skip_comments => false }},
{elvis_style, dont_repeat_yourself, #{ min_complexity => 100 }}
]
},
#{dirs => ["."],
filter => "Makefile",
ruleset => makefiles
},
#{dirs => ["."],
filter => "rebar.config",
ruleset => rebar_config
},
#{dirs => ["."],
filter => "elvis.config",
ruleset => elvis_config
}
]
}
]
}
].

View File

@ -1,5 +1,5 @@
######################################################################
## Erlang VM Args for EMQ X Broker
## Erlang VM Args
######################################################################
## NOTE:

View File

@ -1,5 +1,5 @@
######################################################################
## Erlang VM Args for EMQ X Edge
## Erlang VM Args
######################################################################
## NOTE:

View File

@ -18,7 +18,7 @@ RELEASE="$(grep -E "define.+EMQX_RELEASE.+${EDITION}" include/emqx_release.hrl |
git_exact_vsn() {
local tag
tag="$(git describe --tags --match "[e|v]*" --exact 2>/dev/null)"
echo "$tag" | sed 's/^[v|e]//g'
echo "${tag#[e|v]}"
}
GIT_EXACT_VSN="$(git_exact_vsn)"

View File

@ -1,5 +1,5 @@
%%-*- mode: erlang -*-
%% EMQ X R4.0 config mapping
%% EMQX Config Mapping
%%--------------------------------------------------------------------
%% Cluster

View File

@ -1,84 +0,0 @@
#!/usr/bin/env bash
## This script checks style of changed files.
## Expect argument 1 to be the git compare base.
set -euo pipefail
elvis_version='1.0.0-emqx-2'
base="${1:-}"
repo="${2:-emqx/emqx}"
REPO="${GITHUB_REPOSITORY:-${repo}}"
if [ "${base}" = "" ]; then
echo "Usage $0 <git-compare-base-ref>"
exit 1
fi
echo "elvis -v: $elvis_version"
echo "git diff base: $base"
if [ ! -f ./elvis ] || [ "$(./elvis -v | grep -oE '[1-9]+\.[0-9]+\.[0-9]+\-emqx-[0-9]+')" != "$elvis_version" ]; then
curl --silent --show-error -fLO "https://github.com/emqx/elvis/releases/download/$elvis_version/elvis"
chmod +x ./elvis
fi
if [[ "$base" =~ [0-9a-f]{8,40} ]]; then
# base is a commit sha1
compare_base="$base"
else
remote="$(git remote -v | grep -E "github\.com(:|/)$REPO((\.git)|(\s))" | grep fetch | awk '{print $1}')"
git fetch "$remote" "$base"
compare_base="$remote/$base"
fi
git_diff() {
git diff --name-only --diff-filter=ACMRTUXB "$compare_base"...HEAD
}
bad_file_count=0
for file in $(git_diff); do
if [ ! -f "$file" ]; then
# file is deleted, skip
continue
fi
if [[ $file != *.erl ]]; then
# not .erl file
continue
fi
if ! ./elvis rock "$file" -c elvis.config; then
bad_file_count=$(( bad_file_count + 1))
fi
done
if [ $bad_file_count -gt 0 ]; then
echo "elvis: $bad_file_count errors"
exit 1
fi
### now check new-line at EOF for changed files
nl_at_eof() {
local file="$1"
if ! [ -f "$file" ]; then
return
fi
case "$file" in
*.png|*rebar3)
return
;;
esac
local lastbyte
lastbyte="$(tail -c 1 "$file" 2>&1)"
if [ "$lastbyte" != '' ]; then
echo "$file"
return 1
fi
}
for file in $(git_diff); do
if ! nl_at_eof "$file"; then
bad_file_count=$(( bad_file_count + 1 ))
fi
done
exit $bad_file_count

View File

@ -5,9 +5,16 @@ set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "$0")/.."
if [ "$(./scripts/get-distro.sh)" = 'windows' ]; then
# Otherwise windows may resolve to find.exe
FIND="/usr/bin/find"
else
FIND='find'
fi
find_app() {
local appdir="$1"
find "${appdir}" -mindepth 1 -maxdepth 1 -type d
"$FIND" "${appdir}" -mindepth 1 -maxdepth 1 -type d
}
# append emqx application first
@ -23,4 +30,4 @@ fi
## find directories in lib-extra
find_app 'lib-extra'
## find symlinks in lib-extra
find 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print
"$FIND" 'lib-extra' -mindepth 1 -maxdepth 1 -type l -exec test -e {} \; -print

View File

@ -13,4 +13,5 @@ if [ "$1" != "emqx" ]; then
BASEDIR="$1"
fi
# shellcheck disable=SC2038
find "${BASEDIR}/test/props" -name "prop_*.erl" 2>/dev/null | xargs -I{} basename {} .erl | xargs | tr ' ' ','

View File

@ -12,4 +12,5 @@ TESTDIR="test"
if [ "$1" != "emqx" ]; then
TESTDIR="$1/test"
fi
# shellcheck disable=SC2038
find "${TESTDIR}" -name "*_SUITE.erl" 2>/dev/null | xargs | tr ' ' ','

72
scripts/relup-base-packages.sh Executable file
View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
## This script helps to download relup base version packages
if [[ -n "$DEBUG" ]]; then
set -x
fi
set -euo pipefail
PROFILE="${1}"
if [ "$PROFILE" = "" ]; then
PROFILE="emqx"
fi
case $PROFILE in
"emqx")
DIR='broker'
EDITION='community'
;;
"emqx-ee")
DIR='enterprise'
EDITION='enterprise'
;;
"emqx-edge")
DIR='edge'
EDITION='edge'
;;
esac
SYSTEM="${SYSTEM:-$(./scripts/get-distro.sh)}"
ARCH="${ARCH:-$(uname -m)}"
case "$ARCH" in
x86_64)
ARCH='amd64'
;;
aarch64)
ARCH='arm64'
;;
arm*)
ARCH=arm
;;
esac
SHASUM="sha256sum"
if [ "$SYSTEM" = "macos" ]; then
SHASUM="shasum -a 256"
fi
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
mkdir -p _upgrade_base
pushd _upgrade_base
for tag in $(../scripts/relup-base-vsns.sh $EDITION | xargs echo -n); do
filename="$PROFILE-$SYSTEM-${tag#[e|v]}-$ARCH.zip"
url="https://www.emqx.com/downloads/$DIR/${tag#[e|v]}/$filename"
echo "downloading base package from ${url} ..."
if [ ! -f "$filename" ] && curl -I -m 10 -o /dev/null -s -w "%{http_code}" "${url}" | grep -q -oE "^[23]+" ; then
curl -L -o "${filename}" "${url}"
if [ "$SYSTEM" != "centos6" ]; then
curl -L -o "${filename}.sha256" "${url}.sha256"
SUMSTR=$(cat "${filename}.sha256")
echo "got sha265sum: ${SUMSTR}"
## https://askubuntu.com/questions/1202208/checking-sha256-checksum
echo "${SUMSTR} ${filename}" | $SHASUM -c || exit 1
fi
fi
done
popd

View File

@ -1,6 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/.."
## This script prints the relup upgrade base versions
## for the given EMQX edition (specified as first arg)
##
@ -17,8 +20,8 @@ parse_semver() {
echo "$1" | tr '.|-' ' '
}
PROFILE="${1:-}"
[ -z "${PROFILE}" ] && usage
EDITION="${1:-}"
[ -z "${EDITION}" ] && usage
## Get the current release version
## e.g.
@ -46,7 +49,7 @@ else
IS_RELEASE=false
fi
case "${PROFILE}" in
case "${EDITION}" in
*enterprise*)
GIT_TAG_PREFIX="e"
;;

View File

@ -3,7 +3,11 @@
set -euo pipefail
target_files=()
while IFS='' read -r line; do target_files+=("$line"); done < <(grep -r -l --exclude-dir=.git --exclude-dir=_build "#!/bin/" .)
while IFS='' read -r line;
do
target_files+=("$line");
done < <(git grep -r -l '^#!/\(bin/\|usr/bin/env bash\)' .)
return_code=0
for i in "${target_files[@]}"; do
echo checking "$i" ...

View File

@ -1,13 +1,26 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.13",[{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
[{"4.3.13",
[{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.12",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
@ -16,13 +29,16 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -35,12 +51,15 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -54,7 +73,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
@ -78,7 +99,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
@ -102,7 +125,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
@ -128,7 +153,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -154,7 +181,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -181,7 +210,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -209,7 +240,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -238,7 +271,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -267,7 +302,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
@ -300,7 +337,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{apply,{emqx_metrics,assign_acl_stats_from_ets_to_counter,[]}},
{apply,{emqx_metrics,upgrade_retained_delayed_counter_type,[]}},
@ -336,11 +375,24 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.13",[{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
[{"4.3.13",
[{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_os_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]}]},
{"4.3.12",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -351,10 +403,13 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_connection,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -369,9 +424,12 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_channel,brutal_purge,soft_purge,[]},
{load_module,emqx_cm,brutal_purge,soft_purge,[]},
{load_module,emqx_metrics,brutal_purge,soft_purge,[]},
{load_module,emqx_access_control,brutal_purge,soft_purge,[]},
{load_module,emqx_session,brutal_purge,soft_purge,[]},
@ -387,7 +445,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -410,7 +470,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -433,7 +495,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.7",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_ctl,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
@ -458,7 +522,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.6",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
@ -483,7 +549,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.5",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
@ -509,7 +577,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
@ -536,7 +606,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_http_lib,brutal_purge,soft_purge,[]},
@ -564,7 +636,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@ -592,7 +666,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_pqueue,brutal_purge,soft_purge,[]},
@ -624,7 +700,9 @@
{load_module,emqx_message,brutal_purge,soft_purge,[]},
{load_module,emqx_limiter,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
[{load_module,emqx_banned,brutal_purge,soft_purge,[]},
{load_module,emqx_pmon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_vm,brutal_purge,soft_purge,[]},
{load_module,emqx_sys_mon,brutal_purge,soft_purge,[]},
{load_module,emqx_alarm_handler,brutal_purge,soft_purge,[]},

View File

@ -47,8 +47,6 @@
, code_change/3
]).
-elvis([{elvis_style, state_record_and_type, disable}]).
-define(BANNED_TAB, ?MODULE).
%%--------------------------------------------------------------------

View File

@ -1515,11 +1515,13 @@ enrich_connack_caps(AckProps, _Channel) -> AckProps.
%%--------------------------------------------------------------------
%% Enrich server keepalive
enrich_server_keepalive(AckProps, #channel{clientinfo = #{zone := Zone}}) ->
enrich_server_keepalive(AckProps, ?IS_MQTT_V5 = #channel{clientinfo = #{zone := Zone}}) ->
case emqx_zone:server_keepalive(Zone) of
undefined -> AckProps;
Keepalive -> AckProps#{'Server-Keep-Alive' => Keepalive}
end.
end;
enrich_server_keepalive(AckProps, _Channel) -> AckProps.
%%--------------------------------------------------------------------
%% Enrich response information
@ -1565,7 +1567,7 @@ init_alias_maximum(#mqtt_packet_connect{proto_ver = ?MQTT_PROTO_V5,
init_alias_maximum(_ConnPkt, _ClientInfo) -> undefined.
%%--------------------------------------------------------------------
%% Enrich Keepalive
%% Ensure Keepalive
%% MQTT 5
ensure_keepalive(#{'Server-Keep-Alive' := Interval}, Channel = #channel{conninfo = ConnInfo}) ->

View File

@ -226,18 +226,25 @@ open_session(true, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
open_session(false, ClientInfo = #{clientid := ClientId}, ConnInfo) ->
Self = self(),
ResumeStart = fun(_) ->
CreateSess =
fun() ->
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
end,
case takeover_session(ClientId) of
{ok, ConnMod, ChanPid, Session} ->
ok = emqx_session:resume(ClientInfo, Session),
Pendings = ConnMod:call(ChanPid, {takeover, 'end'}, ?T_TAKEOVER),
case request_stepdown({takeover, 'end'}, ConnMod, ChanPid) of
{ok, Pendings} ->
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session,
present => true,
pendings => Pendings}};
{error, not_found} ->
Session = create_session(ClientInfo, ConnInfo),
register_channel(ClientId, Self, ConnInfo),
{ok, #{session => Session, present => false}}
{error, _} ->
CreateSess()
end;
{error, _Reason} -> CreateSess()
end
end,
emqx_cm_locker:trans(ClientId, ResumeStart).
@ -271,9 +278,12 @@ takeover_session(ClientId, ChanPid) when node(ChanPid) == node() ->
undefined ->
{error, not_found};
ConnMod when is_atom(ConnMod) ->
%% TODO: if takeover times out, maybe kill the old?
Session = ConnMod:call(ChanPid, {takeover, 'begin'}, ?T_TAKEOVER),
{ok, ConnMod, ChanPid, Session}
case request_stepdown({takeover, 'begin'}, ConnMod, ChanPid) of
{ok, Session} ->
{ok, ConnMod, ChanPid, Session};
{error, Reason} ->
{error, Reason}
end
end;
takeover_session(ClientId, ChanPid) ->
rpc_call(node(ChanPid), takeover_session, [ClientId, ChanPid], ?T_TAKEOVER).
@ -286,42 +296,63 @@ discard_session(ClientId) when is_binary(ClientId) ->
ChanPids -> lists:foreach(fun(Pid) -> discard_session(ClientId, Pid) end, ChanPids)
end.
%% @private Kick a local stale session to force it step down.
%% If failed to kick (e.g. timeout) force a kill.
%% @private call a local stale session to execute an Action.
%% If failed to response (e.g. timeout) force a kill.
%% Keeping the stale pid around, or returning error or raise an exception
%% benefits nobody.
-spec kick_or_kill(kick | discard, module(), pid()) -> ok.
kick_or_kill(Action, ConnMod, Pid) ->
try
-spec request_stepdown(Action, module(), pid())
-> ok
| {ok, emqx_session:session() | list(emqx_type:deliver())}
| {error, term()}
when Action :: kick | discard | {takeover, 'begin'} | {takeover, 'end'}.
request_stepdown(Action, ConnMod, Pid) ->
Timeout =
case Action == kick orelse Action == discard of
true -> ?T_KICK;
_ -> ?T_TAKEOVER
end,
Return =
%% this is essentailly a gen_server:call implemented in emqx_connection
%% and emqx_ws_connection.
%% the handle_call is implemented in emqx_channel
ok = apply(ConnMod, call, [Pid, Action, ?T_KICK])
try apply(ConnMod, call, [Pid, Action, Timeout]) of
ok -> ok;
Reply -> {ok, Reply}
catch
_ : noproc -> % emqx_ws_connection: call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
{error, noproc};
_ : {noproc, _} -> % emqx_connection: gen_server:call
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action});
_ : {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
_ : {{shutdown, _}, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action});
ok = ?tp(debug, "session_already_gone", #{pid => Pid, action => Action}),
{error, noproc};
_ : Reason = {shutdown, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
{error, Reason};
_ : Reason = {{shutdown, _}, _} ->
ok = ?tp(debug, "session_already_shutdown", #{pid => Pid, action => Action}),
{error, Reason};
_ : {timeout, {gen_server, call, _}} ->
?tp(warning, "session_kick_timeout",
?tp(warning, "session_stepdown_request_timeout",
#{pid => Pid,
action => Action,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid);
ok = force_kill(Pid),
{error, timeout};
_ : Error : St ->
?tp(error, "session_kick_exception",
?tp(error, "session_stepdown_request_exception",
#{pid => Pid,
action => Action,
reason => Error,
stacktrace => St,
stale_channel => stale_channel_info(Pid)
}),
ok = force_kill(Pid)
ok = force_kill(Pid),
{error, Error}
end,
case Action == kick orelse Action == discard of
true -> ok;
_ -> Return
end.
force_kill(Pid) ->
@ -344,7 +375,7 @@ kick_session(Action, ClientId, ChanPid) when node(ChanPid) == node() ->
%% already deregistered
ok;
ConnMod when is_atom(ConnMod) ->
ok = kick_or_kill(Action, ConnMod, ChanPid)
ok = request_stepdown(Action, ConnMod, ChanPid)
end;
kick_session(Action, ClientId, ChanPid) ->
%% call remote node on the old APIs because we do not know if they have upgraded

View File

@ -30,8 +30,6 @@
-compile(nowarn_export_all).
-endif.
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_connection]}}]).
%% API
-export([ start_link/3
, stop/1

View File

@ -57,8 +57,6 @@
, code_change/3
]).
-elvis([{elvis_style, invalid_dynamic_call, #{ignore => [emqx_ctl]}}]).
-record(state, {seq = 0}).
-type(cmd() :: atom()).

View File

@ -37,8 +37,6 @@
-export_type([config/0]).
-elvis([{elvis_style, no_nested_try_catch, #{ ignore => [emqx_logger_jsonfmt]}}]).
-type config() :: #{depth => pos_integer() | unlimited,
report_cb => logger:report_cb(),
single_line => boolean()}.

View File

@ -84,8 +84,6 @@
-export([format/1]).
-elvis([{elvis_style, god_modules, disable}]).
-spec(make(emqx_topic:topic(), emqx_types:payload()) -> emqx_types:message()).
make(Topic, Payload) ->
make(undefined, Topic, Payload).

View File

@ -190,7 +190,7 @@ ensure_system_memory_alarm(HW) ->
case erlang:whereis(memsup) of
undefined -> ok;
_Pid ->
{Allocated, Total, _Worst} = memsup:get_memory_data(),
{Total, Allocated, _Worst} = memsup:get_memory_data(),
case Total =/= 0 andalso Allocated/Total * 100 > HW of
true -> emqx_alarm:activate(high_system_memory_usage, #{high_watermark => HW});
false -> ok

View File

@ -50,9 +50,11 @@ monitor(Pid, PMon) ->
?MODULE:monitor(Pid, undefined, PMon).
-spec(monitor(pid(), term(), pmon()) -> pmon()).
monitor(Pid, Val, PMon = ?PMON(Map)) ->
monitor(Pid, Val, ?PMON(Map)) ->
case maps:is_key(Pid, Map) of
true -> PMon;
true ->
{Ref, _Val} = maps:get(Pid, Map),
?PMON(maps:put(Pid, {Ref, Val}, Map));
false ->
Ref = erlang:monitor(process, Pid),
?PMON(maps:put(Pid, {Ref, Val}, Map))

View File

@ -119,7 +119,7 @@ handle_info({monitor, Port, long_schedule, Info}, State) when is_port(Port) ->
suppress({long_schedule, Port},
fun() ->
WarnMsg = io_lib:format("long_schedule warning: port = ~p, info: ~p", [Port, Info]),
?LOG(warning, "~s~n~p", [WarnMsg, erlang:port_info(Port)]),
?LOG(warning, "~s~n~p", [WarnMsg, portinfo(Port)]),
safe_publish(long_schedule, WarnMsg)
end, State);
@ -135,7 +135,7 @@ handle_info({monitor, SusPid, busy_port, Port}, State) ->
suppress({busy_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
safe_publish(busy_port, WarnMsg)
end, State);
@ -143,7 +143,7 @@ handle_info({monitor, SusPid, busy_dist_port, Port}, State) ->
suppress({busy_dist_port, Port},
fun() ->
WarnMsg = io_lib:format("busy_dist_port warning: suspid = ~p, port = ~p", [SusPid, Port]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), erlang:port_info(Port)]),
?LOG(warning, "~s~n~p~n~p", [WarnMsg, procinfo(SusPid), portinfo(Port)]),
safe_publish(busy_dist_port, WarnMsg)
end, State);
@ -200,3 +200,9 @@ safe_publish(Event, WarnMsg) ->
sysmon_msg(Topic, Payload) ->
Msg = emqx_message:make(?SYSMON, Topic, Payload),
emqx_message:set_flag(sys, Msg).
portinfo(Port) ->
case is_port(Port) andalso erlang:port_info(Port) of
L when is_list(L) -> L;
_ -> []
end.

View File

@ -187,46 +187,78 @@ t_open_session_race_condition(_) ->
ok = flush_emqx_pool(),
?assertEqual([], emqx_cm:lookup_channels(ClientId)).
t_kick_session_discard_normal(_) ->
test_kick_session(discard, normal).
t_stepdown_sessiondiscard_normal(_) ->
test_stepdown_session(discard, normal).
t_kick_session_discard_shutdown(_) ->
test_kick_session(discard, shutdown).
t_stepdown_sessiondiscard_shutdown(_) ->
test_stepdown_session(discard, shutdown).
t_kick_session_discard_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_stepdown_sessiondiscard_shutdown_with_reason(_) ->
test_stepdown_session(discard, {shutdown, discard}).
t_kick_session_discard_timeout(_) ->
test_kick_session(discard, timeout).
t_stepdown_sessiondiscard_timeout(_) ->
test_stepdown_session(discard, timeout).
t_kick_session_discard_noproc(_) ->
test_kick_session(discard, noproc).
t_stepdown_sessiondiscard_noproc(_) ->
test_stepdown_session(discard, noproc).
t_kick_session_kick_normal(_) ->
test_kick_session(discard, normal).
t_stepdown_sessionkick_normal(_) ->
test_stepdown_session(kick, normal).
t_kick_session_kick_shutdown(_) ->
test_kick_session(discard, shutdown).
t_stepdown_sessionkick_shutdown(_) ->
test_stepdown_session(kick, shutdown).
t_kick_session_kick_shutdown_with_reason(_) ->
test_kick_session(discard, {shutdown, discard}).
t_stepdown_sessionkick_shutdown_with_reason(_) ->
test_stepdown_session(kick, {shutdown, discard}).
t_kick_session_kick_timeout(_) ->
test_kick_session(discard, timeout).
t_stepdown_sessionkick_timeout(_) ->
test_stepdown_session(kick, timeout).
t_kick_session_kick_noproc(_) ->
test_kick_session(discard, noproc).
t_stepdown_sessionkick_noproc(_) ->
test_stepdown_session(discard, noproc).
test_kick_session(Action, Reason) ->
t_stepdown_sessiontakeover_begin_normal(_) ->
test_stepdown_session({takeover, 'begin'}, normal).
t_stepdown_sessiontakeover_begin_shutdown(_) ->
test_stepdown_session({takeover, 'begin'}, shutdown).
t_stepdown_sessiontakeover_begin_shutdown_with_reason(_) ->
test_stepdown_session({takeover, 'begin'}, {shutdown, discard}).
t_stepdown_sessiontakeover_begin_timeout(_) ->
test_stepdown_session({takeover, 'begin'}, timeout).
t_stepdown_sessiontakeover_begin_noproc(_) ->
test_stepdown_session({takeover, 'begin'}, noproc).
t_stepdown_sessiontakeover_end_normal(_) ->
test_stepdown_session({takeover, 'end'}, normal).
t_stepdown_sessiontakeover_end_shutdown(_) ->
test_stepdown_session({takeover, 'end'}, shutdown).
t_stepdown_sessiontakeover_end_shutdown_with_reason(_) ->
test_stepdown_session({takeover, 'end'}, {shutdown, discard}).
t_stepdown_sessiontakeover_end_timeout(_) ->
test_stepdown_session({takeover, 'end'}, timeout).
t_stepdown_sessiontakeover_end_noproc(_) ->
test_stepdown_session({takeover, 'end'}, noproc).
test_stepdown_session(Action, Reason) ->
ClientId = rand_client_id(),
#{conninfo := ConnInfo} = ?ChanInfo,
FakeSessionFun =
fun Loop() ->
receive
{'$gen_call', From, A} when A =:= kick orelse
A =:= discard ->
A =:= discard orelse
A =:= {takeover, 'begin'} orelse
A =:= {takeover, 'end'} ->
case Reason of
normal ->
normal when A =:= kick orelse A =:= discard ->
gen_server:reply(From, ok);
timeout ->
%% no response to the call
@ -249,9 +281,10 @@ test_kick_session(Action, Reason) ->
noproc -> exit(Pid1, kill), exit(Pid2, kill);
_ -> ok
end,
ok = case Action of
_ = case Action of
kick -> emqx_cm:kick_session(ClientId);
discard -> emqx_cm:discard_session(ClientId)
discard -> emqx_cm:discard_session(ClientId);
{takeover, _} -> emqx_cm:takeover_session(ClientId)
end,
case Reason =:= timeout orelse Reason =:= noproc of
true ->

View File

@ -33,6 +33,11 @@
{self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")), list_to_port("#Port<0.4>")},
%% for the case when the port is missing, for some
%% reason.
{self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), []), []},
{self(), busy_dist_port,
concat_str("busy_dist_port warning: suspid = ~p, port = ~p",
self(), list_to_port("#Port<0.4>")),list_to_port("#Port<0.4>")},
@ -122,6 +127,16 @@ t_sys_mon(_Config) ->
validate_sys_mon_info(PidOrPort, SysMonName,ValidateInfo, InfoOrPort)
end, ?INPUTINFO).
%% Existing port, but closed.
t_sys_mon_dead_port(_Config) ->
process_flag(trap_exit, true),
Port = dead_port(),
{PidOrPort, SysMonName, ValidateInfo, InfoOrPort} =
{self(), busy_port,
concat_str("busy_port warning: suspid = ~p, port = ~p",
self(), Port), Port},
validate_sys_mon_info(PidOrPort, SysMonName, ValidateInfo, InfoOrPort).
t_sys_mon2(_Config) ->
?SYSMON ! {timeout, ignored, reset},
?SYSMON ! {ignored},
@ -155,3 +170,8 @@ some_function(Parent, _Arg2) ->
stop ->
ok
end.
dead_port() ->
Port = erlang:open_port({spawn, "ls"}, []),
exit(Port, kill),
Port.