Merge pull request #8947 from zmstone/0913-sync-master-to-ee50

0913 sync master to ee50
This commit is contained in:
Zaiming (Stone) Shi 2022-09-13 15:46:52 +02:00 committed by GitHub
commit 477d4b0b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
33 changed files with 665 additions and 244 deletions

View File

@ -185,9 +185,19 @@ jobs:
img_suffix="elixir-${{ matrix.arch }}" img_suffix="elixir-${{ matrix.arch }}"
img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n${img_labels}" img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n${img_labels}"
fi fi
if [ ${{ matrix.profile }} = "emqx" ]; then
img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}"
fi
if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then
img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}"
fi
if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then
img_suffix="${img_suffix}-alpine" img_suffix="${img_suffix}-alpine"
fi fi
echo "::set-output name=emqx_name::${emqx_name}" echo "::set-output name=emqx_name::${emqx_name}"
echo "::set-output name=img_suffix::${img_suffix}" echo "::set-output name=img_suffix::${img_suffix}"
echo "::set-output name=img_labels::${img_labels}" echo "::set-output name=img_labels::${img_labels}"
@ -303,10 +313,19 @@ jobs:
img_suffix="elixir-${{ matrix.arch }}" img_suffix="elixir-${{ matrix.arch }}"
img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n$img_labels" img_labels="org.opencontainers.image.elixir.version=${{ matrix.elixir }}\n$img_labels"
fi fi
if [ ${{ matrix.profile }} = "emqx" ]; then
img_labels="org.opencontainers.image.edition=Opensource\n${img_labels}"
fi
if [ ${{ matrix.profile }} = "emqx-enterprise" ]; then
img_labels="org.opencontainers.image.edition=Enterprise\n${img_labels}"
fi
if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then if [[ ${{ matrix.os[0] }} =~ "alpine" ]]; then
img_suffix="${img_suffix}-alpine" img_suffix="${img_suffix}-alpine"
fi fi
echo "::set-output name=img::${img}"
echo "::set-output name=emqx_name::${emqx_name}" echo "::set-output name=emqx_name::${emqx_name}"
echo "::set-output name=img_suffix::${img_suffix}" echo "::set-output name=img_suffix::${img_suffix}"
echo "::set-output name=img_labels::${img_labels}" echo "::set-output name=img_labels::${img_labels}"

View File

@ -80,7 +80,9 @@ jobs:
- uses: actions/upload-artifact@v2 - uses: actions/upload-artifact@v2
with: with:
name: "${{ matrix.profile }}_schema_dump" name: "${{ matrix.profile }}_schema_dump"
path: _build/*/lib/emqx_dashboard/priv/www/static/schema.json path: |
scripts/spellcheck
_build/${{ matrix.profile }}/lib/emqx_dashboard/priv/www/static/schema.json
windows: windows:
runs-on: windows-2019 runs-on: windows-2019
@ -205,7 +207,6 @@ jobs:
- emqx - emqx
- emqx-enterprise - emqx-enterprise
runs-on: aws-amd64 runs-on: aws-amd64
container: "ghcr.io/emqx/emqx-schema-validate:0.3.5"
steps: steps:
- uses: actions/download-artifact@v2 - uses: actions/download-artifact@v2
name: Download schema dump name: Download schema dump
@ -214,9 +215,7 @@ jobs:
path: /tmp/ path: /tmp/
- name: Run spellcheck - name: Run spellcheck
run: | run: |
cd /LanguageTool bash /tmp/scripts/spellcheck/spellcheck.sh /tmp/_build/${{ matrix.profile }}/lib/emqx_dashboard/priv/www/static/schema.json
bash start.sh > /dev/null &
./emqx_schema_validate /tmp/${{ matrix.profile }}/lib/emqx_dashboard/priv/www/static/schema.json
allgood_packaging: allgood_packaging:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@ -6,12 +6,18 @@
* Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867) * Fix JWT plugin don't support non-integer timestamp claims. [#8867](https://github.com/emqx/emqx/pull/8867)
* Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887) * Avoid publishing will message when client fails to auhtenticate. [#8887](https://github.com/emqx/emqx/pull/8887)
* Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893) * Speed up dispatching of shared subscription messages in a cluster [#8893](https://github.com/emqx/emqx/pull/8893)
* Fix the extra / prefix when CoAP gateway parsing client topics. [#8658](https://github.com/emqx/emqx/pull/8658)
* Speed up updating the configuration, When some nodes in the cluster are down. [#8857](https://github.com/emqx/emqx/pull/8857)
* Fix delayed publish inaccurate caused by os time change. [#8926](https://github.com/emqx/emqx/pull/8926)
* Fix that EMQX can't start when the retainer is disabled [#8911](https://github.com/emqx/emqx/pull/8911)
## Enhancements ## Enhancements
* Print a warning message when boot with the default (insecure) Erlang cookie. [#8905](https://github.com/emqx/emqx/pull/8905)
* Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823) * Change the `/gateway` API path to plural form. [#8823](https://github.com/emqx/emqx/pull/8823)
* Remove `node.etc_dir` from emqx.conf, because it is never used. * Remove `node.etc_dir` from emqx.conf, because it is never used.
Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892) Also allow user to customize the logging directory [#8892](https://github.com/emqx/emqx/pull/8892)
* Added a new API `POST /listeners` for creating listener. [#8876](https://github.com/emqx/emqx/pull/8876)
# 5.0.7 # 5.0.7

View File

@ -88,9 +88,9 @@ docker run -d --name emqx-ee -p 1883:1883 -p 8081:8081 -p 8083:8083 -p 8084:8084
## Сборка из исходного кода ## Сборка из исходного кода
Начиная с релиза 3.0, для сборки требуется Erlang/OTP R21 или выше. Ветка `master` предназначена для последней версии 5, переключитесь на ветку `main-v4.3` для версии 4.3 и `main-v4.4` для версии 4.4.
Инструкция для сборки версии 4.3 и выше: EMQX требует OTP 22 или 23 для версии 4.3 и OTP 24 для версий 4.4 и 5.0.
```bash ```bash
git clone https://github.com/emqx/emqx.git git clone https://github.com/emqx/emqx.git
@ -99,7 +99,7 @@ make
_build/emqx/rel/emqx/bin/emqx console _build/emqx/rel/emqx/bin/emqx console
``` ```
Более ранние релизы могут быть собраны с помощью другого репозитория: Версии до 4.2 (включительно) нужно собирать из другого репозитория:
```bash ```bash
git clone https://github.com/emqx/emqx-rel.git git clone https://github.com/emqx/emqx-rel.git
@ -108,79 +108,24 @@ make
_build/emqx/rel/emqx/bin/emqx console _build/emqx/rel/emqx/bin/emqx console
``` ```
## Первый запуск ### Сборка на Apple silicon (M1, M2)
Если emqx был собран из исходников: `cd _build/emqx/rel/emqx`. Пакетный менеджер Homebrew, когда установлен на Apple silicon, [стал использовать другую домашнюю папку по умолчанию](https://github.com/Homebrew/brew/issues/9177), `/opt/homebrew` вместо `/usr/local`. В результате некоторые библиотеки перестали собираться автоматически.
Или перейдите в директорию, куда emqx был установлен из бинарного пакета.
Касательно EMQX, сборка Erlang из исходного кода не найдёт библиотеку `unixodbc`, установленную с homebrew, без дополнительных действий:
```bash ```bash
# Запуск: brew install unixodbc kerl
./bin/emqx start sudo ln -s $(realpath $(brew --prefix unixodbc)) /usr/local/odbc
export CC="/usr/bin/gcc -I$(brew --prefix unixodbc)/include"
# Проверка статуса: export LDFLAGS="-L$(brew --prefix unixodbc)/lib"
./bin/emqx_ctl status kerl build 24.3
mkdir ~/.kerl/installations
# Остановка: kerl install 24.3 ~/.kerl/installations/24.3
./bin/emqx stop . ~/.kerl/installations/24.3/activate
``` ```
Веб-интерфейс брокера будет доступен по ссылке: http://localhost:18083 Дальше можно собирать emqx как обычно, с помощью `make`.
## Тесты
### Полное тестирование
```
make eunit ct
```
### Запуск части тестов
Пример:
```bash
make apps/emqx_retainer-ct
```
### Dialyzer
##### Статический анализ всех приложений
```
make dialyzer
```
##### Статический анализ части приложений (список через запятую)
```
DIALYZER_ANALYSE_APP=emqx_lwm2m,emqx_authz make dialyzer
```
## Сообщество
### FAQ
Наиболее частые проблемы разобраны в [EMQX FAQ](https://www.emqx.io/docs/en/latest/faq/faq.html).
### Вопросы
Задать вопрос или поделиться идеей можно в [GitHub Discussions](https://github.com/emqx/emqx/discussions).
### Предложения
Более масштабные предложения можно присылать в виде pull request в репозиторий [EIP](https://github.com/emqx/eip).
### Разработка плагинов
Инструкция по разработке собственных плагинов доступна по ссылке: [PLUGIN.md](./PLUGIN.md)
## Спецификации стандарта MQTT
Следующие ссылки содержат спецификации стандартов:
[MQTT Version 3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html)
[MQTT Version 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html)
[MQTT SN](https://www.oasis-open.org/committees/download.php/66091/MQTT-SN_spec_v1.2.pdf)
## Лицензия ## Лицензия

View File

@ -361,7 +361,12 @@ handle_response(Headers, Body) ->
_ -> _ ->
ignore ignore
end; end;
{error, _Reason} -> {error, Reason} ->
?TRACE_AUTHN_PROVIDER(
error,
"parse_http_response_failed",
#{content_type => ContentType, body => Body, reason => Reason}
),
ignore ignore
end. end.

View File

@ -399,7 +399,7 @@ do_verify(JWT, [JWK | More], VerifyClaims) ->
end. end.
verify_claims(Claims, VerifyClaims0) -> verify_claims(Claims, VerifyClaims0) ->
Now = os:system_time(seconds), Now = erlang:system_time(seconds),
VerifyClaims = VerifyClaims =
[ [
{<<"exp">>, fun(ExpireTime) -> {<<"exp">>, fun(ExpireTime) ->

View File

@ -149,8 +149,8 @@ authenticate(
of of
ok -> ok ->
{ok, emqx_authn_utils:is_superuser(Selected)}; {ok, emqx_authn_utils:is_superuser(Selected)};
{error, Reason} -> {error, _Reason} ->
{error, Reason} ignore
end; end;
{error, Reason} -> {error, Reason} ->
?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{ ?TRACE_AUTHN_PROVIDER(error, "redis_query_failed", #{

View File

@ -173,6 +173,9 @@ test_user_auth(#{
{create_authenticator, ?GLOBAL, AuthConfig} {create_authenticator, ?GLOBAL, AuthConfig}
), ),
{ok, [#{provider := emqx_authn_redis, state := State}]} =
emqx_authentication:list_authenticators(?GLOBAL),
Credentials = Credentials0#{ Credentials = Credentials0#{
listener => 'tcp:default', listener => 'tcp:default',
protocol => mqtt protocol => mqtt
@ -180,6 +183,15 @@ test_user_auth(#{
?assertEqual(Result, emqx_access_control:authenticate(Credentials)), ?assertEqual(Result, emqx_access_control:authenticate(Credentials)),
AuthnResult =
case Result of
{error, _} ->
ignore;
Any ->
Any
end,
?assertEqual(AuthnResult, emqx_authn_redis:authenticate(Credentials, State)),
emqx_authn_test_lib:delete_authenticators( emqx_authn_test_lib:delete_authenticators(
[authentication], [authentication],
?GLOBAL ?GLOBAL
@ -466,7 +478,7 @@ user_seeds() ->
<<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>, <<"cmd">> => <<"HMGET mqtt_user:${username} password_hash salt is_superuser">>,
<<"password_hash_algorithm">> => #{<<"name">> => <<"bcrypt">>} <<"password_hash_algorithm">> => #{<<"name">> => <<"bcrypt">>}
}, },
result => {error, bad_username_or_password} result => {error, not_authorized}
}, },
#{ #{

View File

@ -10,7 +10,7 @@
node { node {
name = "emqx@127.0.0.1" name = "emqx@127.0.0.1"
cookie = emqxsecretcookie cookie = "{{ emqx_default_erlang_cookie }}"
data_dir = "{{ platform_data_dir }}" data_dir = "{{ platform_data_dir }}"
} }

View File

@ -72,6 +72,7 @@
-define(TIMEOUT, timer:minutes(1)). -define(TIMEOUT, timer:minutes(1)).
-define(APPLY_KIND_REPLICATE, replicate). -define(APPLY_KIND_REPLICATE, replicate).
-define(APPLY_KIND_INITIATE, initiate). -define(APPLY_KIND_INITIATE, initiate).
-define(IS_STATUS(_A_), (_A_ =:= peers_lagging orelse _A_ =:= stopped_nodes)).
-type tnx_id() :: pos_integer(). -type tnx_id() :: pos_integer().
@ -123,13 +124,13 @@ start_link(Node, Name, RetryMs) ->
%% the result is expected to be `ok | {ok, _}' to indicate success, %% the result is expected to be `ok | {ok, _}' to indicate success,
%% and `{error, _}' to indicate failure. %% and `{error, _}' to indicate failure.
%% %%
%% The excpetion of the MFA evaluation is captured and translated %% The exception of the MFA evaluation is captured and translated
%% into an `{error, _}' tuple. %% into an `{error, _}' tuple.
%% This call tries to wait for all peer nodes to be in-sync before %% This call tries to wait for all peer nodes to be in-sync before
%% returning the result. %% returning the result.
%% %%
%% In case of partial success, an `error' level log is emitted %% In case of partial success, an `error' level log is emitted
%% but the initial localy apply result is returned. %% but the initial local apply result is returned.
-spec multicall(module(), atom(), list()) -> term(). -spec multicall(module(), atom(), list()) -> term().
multicall(M, F, A) -> multicall(M, F, A) ->
multicall(M, F, A, all, timer:minutes(2)). multicall(M, F, A, all, timer:minutes(2)).
@ -141,11 +142,12 @@ multicall(M, F, A, RequiredSyncs, Timeout) when RequiredSyncs =:= all orelse Req
Result; Result;
{init_failure, Error} -> {init_failure, Error} ->
Error; Error;
{peers_lagging, TnxId, Res, Nodes} -> {Status, TnxId, Res, Nodes} when ?IS_STATUS(Status) ->
%% The init MFA return ok, but some other nodes failed. %% The init MFA return ok, but some other nodes failed.
?SLOG(error, #{ ?SLOG(error, #{
msg => "cluster_rpc_peers_lagging", msg => "cluster_rpc_peers_lagging",
lagging_nodes => Nodes, status => Status,
nodes => Nodes,
tnx_id => TnxId tnx_id => TnxId
}), }),
Res Res
@ -193,9 +195,9 @@ do_multicall(M, F, A, RequiredSyncs, Timeout) ->
InitRes; InitRes;
{init_failure, Error0} -> {init_failure, Error0} ->
{init_failure, Error0}; {init_failure, Error0};
{peers_lagging, Nodes} -> {Status, Nodes} when ?IS_STATUS(Status) ->
{ok, TnxId0, MFARes} = InitRes, {ok, TnxId0, MFARes} = InitRes,
{peers_lagging, TnxId0, MFARes, Nodes} {Status, TnxId0, MFARes, Nodes}
end. end.
-spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}. -spec query(pos_integer()) -> {'atomic', map()} | {'aborted', Reason :: term()}.
@ -509,14 +511,18 @@ do_alarm(Fun, Res, #{tnx_id := Id} = Meta) ->
emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg). emqx_alarm:Fun(cluster_rpc_apply_failed, Meta#{result => ?TO_BIN(Res)}, AlarmMsg).
wait_for_all_nodes_commit(TnxId, Delay, Remain) -> wait_for_all_nodes_commit(TnxId, Delay, Remain) ->
case lagging_node(TnxId) of Lagging = lagging_nodes(TnxId),
Stopped = stopped_nodes(),
case Lagging -- Stopped of
[] when Stopped =:= [] ->
ok;
[] ->
{stopped_nodes, Stopped};
[_ | _] when Remain > 0 -> [_ | _] when Remain > 0 ->
ok = timer:sleep(Delay), ok = timer:sleep(Delay),
wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay); wait_for_all_nodes_commit(TnxId, Delay, Remain - Delay);
[] -> [_ | _] ->
ok; {peers_lagging, Lagging}
Nodes ->
{peers_lagging, Nodes}
end. end.
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) -> wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
@ -527,14 +533,18 @@ wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain) ->
false when Remain > 0 -> false when Remain > 0 ->
wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay); wait_for_nodes_commit(RequiredSyncs, TnxId, Delay, Remain - Delay);
false -> false ->
case lagging_node(TnxId) of case lagging_nodes(TnxId) of
%% All commit but The succeedNum > length(nodes()). [] ->
[] -> ok; ok;
Nodes -> {peers_lagging, Nodes} Lagging ->
case stopped_nodes() of
[] -> {peers_lagging, Lagging};
Stopped -> {stopped_nodes, Stopped}
end
end end
end. end.
lagging_node(TnxId) -> lagging_nodes(TnxId) ->
{atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]), {atomic, Nodes} = transaction(fun ?MODULE:commit_status_trans/2, ['<', TnxId]),
Nodes. Nodes.
@ -548,6 +558,9 @@ commit_status_trans(Operator, TnxId) ->
Result = '$2', Result = '$2',
mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]). mnesia:select(?CLUSTER_COMMIT, [{MatchHead, [Guard], [Result]}]).
stopped_nodes() ->
ekka_cluster:info(stopped_nodes).
get_retry_ms() -> get_retry_ms() ->
emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)). emqx_conf:get([node, cluster_call, retry_interval], timer:minutes(1)).

View File

@ -399,7 +399,7 @@ fields("node") ->
string(), string(),
#{ #{
mapping => "vm_args.-setcookie", mapping => "vm_args.-setcookie",
default => "emqxsecretcookie", required => true,
'readOnly' => true, 'readOnly' => true,
sensitive => true, sensitive => true,
desc => ?DESC(node_cookie) desc => ?DESC(node_cookie)

View File

@ -785,7 +785,7 @@ to_bin(List) when is_list(List) ->
to_bin(Boolean) when is_boolean(Boolean) -> Boolean; to_bin(Boolean) when is_boolean(Boolean) -> Boolean;
to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8); to_bin(Atom) when is_atom(Atom) -> atom_to_binary(Atom, utf8);
to_bin({Type, Args}) -> to_bin({Type, Args}) ->
unicode:characters_to_binary(io_lib:format("~p(~p)", [Type, Args])); unicode:characters_to_binary(io_lib:format("~ts(~p)", [Type, Args]));
to_bin(X) -> to_bin(X) ->
X. X.

View File

@ -153,7 +153,7 @@ init(
mountpoint => Mountpoint mountpoint => Mountpoint
} }
), ),
%% FIXME: it should coap.hearbeat instead of idle_timeout?
Heartbeat = ?GET_IDLE_TIME(Config), Heartbeat = ?GET_IDLE_TIME(Config),
#channel{ #channel{
ctx = Ctx, ctx = Ctx,
@ -447,6 +447,7 @@ enrich_conninfo(
conninfo = ConnInfo conninfo = ConnInfo
} }
) -> ) ->
%% FIXME: generate a random clientid if absent
case Queries of case Queries of
#{<<"clientid">> := ClientId} -> #{<<"clientid">> := ClientId} ->
Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)), Interval = maps:get(interval, emqx_keepalive:info(KeepAlive)),
@ -467,6 +468,9 @@ enrich_clientinfo(
{Queries, Msg}, {Queries, Msg},
Channel = #channel{clientinfo = ClientInfo0} Channel = #channel{clientinfo = ClientInfo0}
) -> ) ->
%% FIXME:
%% 1. generate a random clientid if absent;
%% 2. assgin username, password to `undefined` if absent
case Queries of case Queries of
#{ #{
<<"username">> := UserName, <<"username">> := UserName,
@ -542,6 +546,7 @@ process_connect(
) )
of of
{ok, _Sess} -> {ok, _Sess} ->
%% FIXME: Token in cluster wide?
RandVal = rand:uniform(?TOKEN_MAXIMUM), RandVal = rand:uniform(?TOKEN_MAXIMUM),
Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)), Token = erlang:list_to_binary(erlang:integer_to_list(RandVal)),
NResult = Result#{events => [{event, connected}]}, NResult = Result#{events => [{event, connected}]},

View File

@ -69,17 +69,7 @@ handle_method(_, _, Msg, _, _) ->
check_topic([]) -> check_topic([]) ->
error; error;
check_topic(Path) -> check_topic(Path) ->
Sep = <<"/">>, {ok, emqx_http_lib:uri_decode(iolist_to_binary(lists:join(<<"/">>, Path)))}.
{ok,
emqx_http_lib:uri_decode(
lists:foldl(
fun(Part, Acc) ->
<<Acc/binary, Sep/binary, Part/binary>>
end,
<<>>,
Path
)
)}.
get_sub_opts(#coap_message{options = Opts} = Msg) -> get_sub_opts(#coap_message{options = Opts} = Msg) ->
SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts), SubOpts = maps:fold(fun parse_sub_opts/3, #{}, Opts),
@ -124,25 +114,30 @@ get_publish_qos(Msg) ->
end. end.
apply_publish_opts(Msg, MQTTMsg) -> apply_publish_opts(Msg, MQTTMsg) ->
maps:fold( case emqx_coap_message:get_option(uri_query, Msg) of
fun undefined ->
(<<"retain">>, V, Acc) -> MQTTMsg;
Val = erlang:binary_to_atom(V), Qs ->
emqx_message:set_flag(retain, Val, Acc); maps:fold(
(<<"expiry">>, V, Acc) -> fun
Val = erlang:binary_to_integer(V), (<<"retain">>, V, Acc) ->
Props = emqx_message:get_header(properties, Acc), Val = erlang:binary_to_atom(V),
emqx_message:set_header( emqx_message:set_flag(retain, Val, Acc);
properties, (<<"expiry">>, V, Acc) ->
Props#{'Message-Expiry-Interval' => Val}, Val = erlang:binary_to_integer(V),
Acc Props = emqx_message:get_header(properties, Acc),
); emqx_message:set_header(
(_, _, Acc) -> properties,
Acc Props#{'Message-Expiry-Interval' => Val},
end, Acc
MQTTMsg, );
emqx_coap_message:get_option(uri_query, Msg) (_, _, Acc) ->
). Acc
end,
MQTTMsg,
Qs
)
end.
subscribe(#coap_message{token = <<>>} = Msg, _, _, _) -> subscribe(#coap_message{token = <<>>} = Msg, _, _, _) ->
reply({error, bad_request}, <<"observe without token">>, Msg); reply({error, bad_request}, <<"observe without token">>, Msg);

View File

@ -143,12 +143,15 @@ t_connection_with_authn_failed(_) ->
ok. ok.
t_publish(_) -> t_publish(_) ->
Action = fun(Channel, Token) -> %% can publish to a normal topic
Topic = <<"/abc">>, Topics = [
<<"abc">>,
%% can publish to a `/` leading topic
<<"/abc">>
],
Action = fun(Topic, Channel, Token) ->
Payload = <<"123">>, Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
TopicStr = binary_to_list(Topic),
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
%% Sub topic first %% Sub topic first
emqx:subscribe(Topic), emqx:subscribe(Topic),
@ -164,24 +167,28 @@ t_publish(_) ->
?assert(false) ?assert(false)
end end
end, end,
with_connection(Action). with_connection(Topics, Action).
t_subscribe(_) -> t_subscribe(_) ->
Topic = <<"/abc">>, %% can subscribe to a normal topic
Fun = fun(Channel, Token) -> Topics = [
TopicStr = binary_to_list(Topic), <<"abc">>,
%% can subscribe to a `/` leading topic
<<"/abc">>
],
Fun = fun(Topic, Channel, Token) ->
Payload = <<"123">>, Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = make_req(get, Payload, [{observe, 0}]), Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req), {ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]), ?LOGT("observer topic:~ts~n", [Topic]),
%% ensure subscribe succeed
timer:sleep(100), timer:sleep(100),
[SubPid] = emqx:subscribers(Topic), [SubPid] = emqx:subscribers(Topic),
?assert(is_pid(SubPid)), ?assert(is_pid(SubPid)),
%% Publish a message %% publish a message
emqx:publish(emqx_message:make(Topic, Payload)), emqx:publish(emqx_message:make(Topic, Payload)),
{ok, content, Notify} = with_response(Channel), {ok, content, Notify} = with_response(Channel),
?LOGT("observer get Notif=~p", [Notify]), ?LOGT("observer get Notif=~p", [Notify]),
@ -191,18 +198,27 @@ t_subscribe(_) ->
?assertEqual(Payload, PayloadRecv) ?assertEqual(Payload, PayloadRecv)
end, end,
with_connection(Fun), with_connection(Topics, Fun),
timer:sleep(100),
?assertEqual([], emqx:subscribers(Topic)). %% subscription removed if coap client disconnected
timer:sleep(100),
lists:foreach(
fun(Topic) ->
?assertEqual([], emqx:subscribers(Topic))
end,
Topics
).
t_un_subscribe(_) -> t_un_subscribe(_) ->
Topic = <<"/abc">>, %% can unsubscribe to a normal topic
Fun = fun(Channel, Token) -> Topics = [
TopicStr = binary_to_list(Topic), <<"abc">>,
%% can unsubscribe to a `/` leading topic
<<"/abc">>
],
Fun = fun(Topic, Channel, Token) ->
Payload = <<"123">>, Payload = <<"123">>,
URI = pubsub_uri(binary_to_list(Topic), Token),
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = make_req(get, Payload, [{observe, 0}]), Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req), {ok, content, _} = do_request(Channel, URI, Req),
@ -219,16 +235,15 @@ t_un_subscribe(_) ->
?assertEqual([], emqx:subscribers(Topic)) ?assertEqual([], emqx:subscribers(Topic))
end, end,
with_connection(Fun). with_connection(Topics, Fun).
t_observe_wildcard(_) -> t_observe_wildcard(_) ->
Fun = fun(Channel, Token) -> Fun = fun(Channel, Token) ->
%% resolve_url can't process wildcard with # %% resolve_url can't process wildcard with #
Topic = <<"/abc/+">>, Topic = <<"abc/+">>,
TopicStr = binary_to_list(Topic),
Payload = <<"123">>, Payload = <<"123">>,
URI = ?PS_PREFIX ++ TopicStr ++ "?clientid=client1&token=" ++ Token, URI = pubsub_uri(binary_to_list(Topic), Token),
Req = make_req(get, Payload, [{observe, 0}]), Req = make_req(get, Payload, [{observe, 0}]),
{ok, content, _} = do_request(Channel, URI, Req), {ok, content, _} = do_request(Channel, URI, Req),
?LOGT("observer topic:~ts~n", [Topic]), ?LOGT("observer topic:~ts~n", [Topic]),
@ -238,7 +253,7 @@ t_observe_wildcard(_) ->
?assert(is_pid(SubPid)), ?assert(is_pid(SubPid)),
%% Publish a message %% Publish a message
PubTopic = <<"/abc/def">>, PubTopic = <<"abc/def">>,
emqx:publish(emqx_message:make(PubTopic, Payload)), emqx:publish(emqx_message:make(PubTopic, Payload)),
{ok, content, Notify} = with_response(Channel), {ok, content, Notify} = with_response(Channel),
@ -320,7 +335,7 @@ t_clients_get_subscription_api(_) ->
{200, [Subs]} = request(get, Path), {200, [Subs]} = request(get, Path),
?assertEqual(<<"/coap/observe">>, maps:get(topic, Subs)), ?assertEqual(<<"coap/observe">>, maps:get(topic, Subs)),
observe(Channel, Token, false), observe(Channel, Token, false),
@ -386,6 +401,9 @@ observe(Channel, Token, false) ->
{ok, nocontent, _Data} = do_request(Channel, URI, Req), {ok, nocontent, _Data} = do_request(Channel, URI, Req),
ok. ok.
pubsub_uri(Topic, Token) when is_list(Topic), is_list(Token) ->
?PS_PREFIX ++ "/" ++ Topic ++ "?clientid=client1&token=" ++ Token.
make_req(Method) -> make_req(Method) ->
make_req(Method, <<>>). make_req(Method, <<>>).
@ -442,6 +460,16 @@ with_connection(Action) ->
end, end,
do(Fun). do(Fun).
with_connection(Checks, Action) ->
Fun = fun(Channel) ->
Token = connection(Channel),
timer:sleep(100),
lists:foreach(fun(E) -> Action(E, Channel, Token) end, Checks),
disconnection(Channel, Token),
timer:sleep(100)
end,
do(Fun).
receive_deliver(Wait) -> receive_deliver(Wait) ->
receive receive
{deliver, _, Msg} -> {deliver, _, Msg} ->

View File

@ -98,7 +98,7 @@ t_case_coap_publish(_) ->
Prefix = Mod:ps_prefix(), Prefix = Mod:ps_prefix(),
Fun = fun(Channel, Token, Topic, Checker) -> Fun = fun(Channel, Token, Topic, Checker) ->
TopicStr = binary_to_list(Topic), TopicStr = binary_to_list(Topic),
URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = Mod:make_req(post, <<>>), Req = Mod:make_req(post, <<>>),
Checker(Mod:do_request(Channel, URI, Req)) Checker(Mod:do_request(Channel, URI, Req))
@ -114,7 +114,7 @@ t_case_coap_subscribe(_) ->
Prefix = Mod:ps_prefix(), Prefix = Mod:ps_prefix(),
Fun = fun(Channel, Token, Topic, Checker) -> Fun = fun(Channel, Token, Topic, Checker) ->
TopicStr = binary_to_list(Topic), TopicStr = binary_to_list(Topic),
URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token, URI = Prefix ++ "/" ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
Req = Mod:make_req(get, <<>>, [{observe, 0}]), Req = Mod:make_req(get, <<>>, [{observe, 0}]),
Checker(Mod:do_request(Channel, URI, Req)) Checker(Mod:do_request(Channel, URI, Req))

View File

@ -96,6 +96,16 @@ schema("/listeners") ->
listener_id_status_example() listener_id_status_example()
) )
} }
},
post => #{
tags => [<<"listeners">>],
desc => <<"Create the specified listener on all nodes.">>,
parameters => [],
'requestBody' => create_listener_schema(#{bind => true}),
responses => #{
200 => listener_schema(#{bind => true}),
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
}
} }
}; };
schema("/listeners/:id") -> schema("/listeners/:id") ->
@ -129,7 +139,8 @@ schema("/listeners/:id") ->
responses => #{ responses => #{
200 => listener_schema(#{bind => true}), 200 => listener_schema(#{bind => true}),
400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST']) 400 => error_codes(['BAD_LISTENER_ID', 'BAD_REQUEST'])
} },
deprecated => true
}, },
delete => #{ delete => #{
tags => [<<"listeners">>], tags => [<<"listeners">>],
@ -251,10 +262,10 @@ fields(node_status) ->
})}, })},
{status, ?HOCON(?R_REF(status))} {status, ?HOCON(?R_REF(status))}
]; ];
fields({Type, with_name}) ->
listener_struct_with_name(Type);
fields(Type) -> fields(Type) ->
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}), listener_struct(Type).
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
Schema.
listener_schema(Opts) -> listener_schema(Opts) ->
emqx_dashboard_swagger:schema_with_example( emqx_dashboard_swagger:schema_with_example(
@ -262,6 +273,17 @@ listener_schema(Opts) ->
tcp_schema_example() tcp_schema_example()
). ).
create_listener_schema(Opts) ->
Schemas = [
?R_REF(Mod, {Type, with_name})
|| #{ref := ?R_REF(Mod, Type)} <- listeners_info(Opts)
],
Example = maps:remove(id, tcp_schema_example()),
emqx_dashboard_swagger:schema_with_example(
?UNION(Schemas),
Example#{name => <<"demo">>}
).
listeners_type() -> listeners_type() ->
lists:map( lists:map(
fun({Type, _}) -> list_to_existing_atom(Type) end, fun({Type, _}) -> list_to_existing_atom(Type) end,
@ -339,7 +361,9 @@ list_listeners(get, #{query_string := Query}) ->
{ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners); {ok, Type} -> listener_type_filter(atom_to_binary(Type), Listeners);
error -> Listeners error -> Listeners
end, end,
{200, listener_status_by_id(NodeL)}. {200, listener_status_by_id(NodeL)};
list_listeners(post, #{body := Body}) ->
create_listener(Body).
crud_listeners_by_id(get, #{bindings := #{id := Id0}}) -> crud_listeners_by_id(get, #{bindings := #{id := Id0}}) ->
Listeners = Listeners =
@ -382,23 +406,8 @@ crud_listeners_by_id(put, #{bindings := #{id := Id}, body := Body0}) ->
_ -> _ ->
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}} {400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
end; end;
crud_listeners_by_id(post, #{bindings := #{id := Id}, body := Body0}) -> crud_listeners_by_id(post, #{body := Body}) ->
case parse_listener_conf(Body0) of create_listener(Body);
{Id, Type, Name, Conf} ->
Path = [listeners, Type, Name],
case create(Path, Conf) of
{ok, #{raw_config := _RawConf}} ->
crud_listeners_by_id(get, #{bindings => #{id => Id}});
{error, already_exist} ->
{400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end;
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}};
_ ->
{400, #{code => 'BAD_LISTENER_ID', message => ?LISTENER_ID_INCONSISTENT}}
end;
crud_listeners_by_id(delete, #{bindings := #{id := Id}}) -> crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id), {ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(Id),
case ensure_remove([listeners, Type, Name]) of case ensure_remove([listeners, Type, Name]) of
@ -408,13 +417,24 @@ crud_listeners_by_id(delete, #{bindings := #{id := Id}}) ->
parse_listener_conf(Conf0) -> parse_listener_conf(Conf0) ->
Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0), Conf1 = maps:without([<<"running">>, <<"current_connections">>], Conf0),
{IdBin, Conf2} = maps:take(<<"id">>, Conf1), {TypeBin, Conf2} = maps:take(<<"type">>, Conf1),
{TypeBin, Conf3} = maps:take(<<"type">>, Conf2),
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
TypeAtom = binary_to_existing_atom(TypeBin), TypeAtom = binary_to_existing_atom(TypeBin),
case Type =:= TypeAtom of
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3}; case maps:take(<<"id">>, Conf2) of
false -> {error, listener_type_inconsistent} {IdBin, Conf3} ->
{ok, #{type := Type, name := Name}} = emqx_listeners:parse_listener_id(IdBin),
case Type =:= TypeAtom of
true -> {binary_to_existing_atom(IdBin), TypeAtom, Name, Conf3};
false -> {error, listener_type_inconsistent}
end;
_ ->
case maps:take(<<"name">>, Conf2) of
{Name, Conf3} ->
IdBin = <<TypeBin/binary, $:, Name/binary>>,
{binary_to_atom(IdBin), TypeAtom, Name, Conf3};
_ ->
{error, listener_config_invalid}
end
end. end.
stop_listeners_by_id(Method, Body = #{bindings := Bindings}) -> stop_listeners_by_id(Method, Body = #{bindings := Bindings}) ->
@ -787,3 +807,37 @@ tcp_schema_example() ->
type => tcp, type => tcp,
zone => default zone => default
}. }.
create_listener(Body) ->
case parse_listener_conf(Body) of
{Id, Type, Name, Conf} ->
Path = [listeners, Type, Name],
case create(Path, Conf) of
{ok, #{raw_config := _RawConf}} ->
crud_listeners_by_id(get, #{bindings => #{id => Id}});
{error, already_exist} ->
{400, #{code => 'BAD_LISTENER_ID', message => <<"Already Exist">>}};
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end;
{error, Reason} ->
{400, #{code => 'BAD_REQUEST', message => err_msg(Reason)}}
end.
listener_struct(Type) ->
Listeners = listeners_info(#{bind => true}) ++ listeners_info(#{bind => false}),
[Schema] = [S || #{ref := ?R_REF(_, T), schema := S} <- Listeners, T =:= Type],
Schema.
listener_struct_with_name(Type) ->
BaseSchema = listener_struct(Type),
lists:keyreplace(
id,
1,
BaseSchema,
{name,
?HOCON(binary(), #{
desc => "Listener name",
required => true
})}
).

View File

@ -37,6 +37,35 @@ t_list_listeners(_) ->
Res = request(get, Path, [], []), Res = request(get, Path, [], []),
#{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(), #{<<"listeners">> := Expect} = emqx_mgmt_api_listeners:do_list_listeners(),
?assertEqual(length(Expect), length(Res)), ?assertEqual(length(Expect), length(Res)),
%% POST /listeners
ListenerId = <<"tcp:default">>,
NewListenerId = <<"tcp:new">>,
OriginPath = emqx_mgmt_api_test_util:api_path(["listeners", ListenerId]),
NewPath = emqx_mgmt_api_test_util:api_path(["listeners", NewListenerId]),
OriginListener = request(get, OriginPath, [], []),
%% create with full options
?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
OriginListener2 = maps:remove(<<"id">>, OriginListener),
NewConf = OriginListener2#{
<<"name">> => <<"new">>,
<<"bind">> => <<"0.0.0.0:2883">>
},
Create = request(post, Path, [], NewConf),
?assertEqual(lists:sort(maps:keys(OriginListener)), lists:sort(maps:keys(Create))),
Get1 = request(get, NewPath, [], []),
?assertMatch(Create, Get1),
?assert(is_running(NewListenerId)),
%% delete
?assertEqual([], delete(NewPath)),
?assertEqual({error, not_found}, is_running(NewListenerId)),
?assertMatch({error, {"HTTP/1.1", 404, _}}, request(get, NewPath, [], [])),
ok. ok.
t_tcp_crud_listeners_by_id(_) -> t_tcp_crud_listeners_by_id(_) ->

View File

@ -296,7 +296,7 @@ handle_cast(Msg, State) ->
%% Do Publish... %% Do Publish...
handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) -> handle_info({timeout, TRef, do_publish}, State = #{publish_timer := TRef}) ->
DeletedKeys = do_publish(mnesia:dirty_first(?TAB), os:system_time(seconds)), DeletedKeys = do_publish(mnesia:dirty_first(?TAB), erlang:system_time(seconds)),
lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys), lists:foreach(fun(Key) -> mria:dirty_delete(?TAB, Key) end, DeletedKeys),
{noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})}; {noreply, ensure_publish_timer(State#{publish_timer := undefined, publish_at := 0})};
handle_info(stats, State = #{stats_fun := StatsFun}) -> handle_info(stats, State = #{stats_fun := StatsFun}) ->
@ -347,12 +347,12 @@ ensure_publish_timer(State) ->
ensure_publish_timer('$end_of_table', State) -> ensure_publish_timer('$end_of_table', State) ->
State#{publish_timer := undefined, publish_at := 0}; State#{publish_timer := undefined, publish_at := 0};
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) -> ensure_publish_timer({Ts, _Id}, State = #{publish_timer := undefined}) ->
ensure_publish_timer(Ts, os:system_time(seconds), State); ensure_publish_timer(Ts, erlang:system_time(seconds), State);
ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when ensure_publish_timer({Ts, _Id}, State = #{publish_timer := TRef, publish_at := PubAt}) when
Ts < PubAt Ts < PubAt
-> ->
ok = emqx_misc:cancel_timer(TRef), ok = emqx_misc:cancel_timer(TRef),
ensure_publish_timer(Ts, os:system_time(seconds), State); ensure_publish_timer(Ts, erlang:system_time(seconds), State);
ensure_publish_timer(_Key, State) -> ensure_publish_timer(_Key, State) ->
State. State.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_modules, [ {application, emqx_modules, [
{description, "EMQX Modules"}, {description, "EMQX Modules"},
{vsn, "5.0.3"}, {vsn, "5.0.4"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},

View File

@ -2,7 +2,7 @@
{application, emqx_retainer, [ {application, emqx_retainer, [
{description, "EMQX Retainer"}, {description, "EMQX Retainer"},
% strict semver, bump manually! % strict semver, bump manually!
{vsn, "5.0.4"}, {vsn, "5.0.5"},
{modules, []}, {modules, []},
{registered, [emqx_retainer_sup]}, {registered, [emqx_retainer_sup]},
{applications, [kernel, stdlib, emqx]}, {applications, [kernel, stdlib, emqx]},

View File

@ -348,16 +348,12 @@ enable_retainer(
#{context_id := ContextId} = State, #{context_id := ContextId} = State,
#{ #{
msg_clear_interval := ClearInterval, msg_clear_interval := ClearInterval,
backend := BackendCfg, backend := BackendCfg
flow_control := FlowControl
} }
) -> ) ->
NewContextId = ContextId + 1, NewContextId = ContextId + 1,
Context = create_resource(new_context(NewContextId), BackendCfg), Context = create_resource(new_context(NewContextId), BackendCfg),
load(Context), load(Context),
emqx_limiter_server:add_bucket(
?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
),
State#{ State#{
enable := true, enable := true,
context_id := NewContextId, context_id := NewContextId,
@ -373,7 +369,6 @@ disable_retainer(
} = State } = State
) -> ) ->
unload(), unload(),
emqx_limiter_server:del_bucket(?APP, internal),
ok = close_resource(Context), ok = close_resource(Context),
State#{ State#{
enable := false, enable := false,

View File

@ -18,6 +18,8 @@
-behaviour(application). -behaviour(application).
-include("emqx_retainer.hrl").
-export([ -export([
start/2, start/2,
stop/1 stop/1
@ -25,8 +27,19 @@
start(_Type, _Args) -> start(_Type, _Args) ->
ok = emqx_retainer_mnesia_cli:load(), ok = emqx_retainer_mnesia_cli:load(),
init_bucket(),
emqx_retainer_sup:start_link(). emqx_retainer_sup:start_link().
stop(_State) -> stop(_State) ->
ok = emqx_retainer_mnesia_cli:unload(), ok = emqx_retainer_mnesia_cli:unload(),
delete_bucket(),
ok. ok.
init_bucket() ->
#{flow_control := FlowControl} = emqx:get_config([retainer]),
emqx_limiter_server:add_bucket(
?APP, internal, maps:get(batch_deliver_limiter, FlowControl, undefined)
).
delete_bucket() ->
emqx_limiter_server:del_bucket(?APP, internal).

View File

@ -31,14 +31,16 @@ all() ->
[ [
{group, mnesia_without_indices}, {group, mnesia_without_indices},
{group, mnesia_with_indices}, {group, mnesia_with_indices},
{group, mnesia_reindex} {group, mnesia_reindex},
{group, test_disable_then_start}
]. ].
groups() -> groups() ->
[ [
{mnesia_without_indices, [sequence], common_tests()}, {mnesia_without_indices, [sequence], common_tests()},
{mnesia_with_indices, [sequence], common_tests()}, {mnesia_with_indices, [sequence], common_tests()},
{mnesia_reindex, [sequence], [t_reindex]} {mnesia_reindex, [sequence], [t_reindex]},
{test_disable_then_start, [sequence], [test_disable_then_start]}
]. ].
common_tests() -> common_tests() ->
@ -624,6 +626,19 @@ t_get_basic_usage_info(_Config) ->
?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()), ?assertEqual(#{retained_messages => 5}, emqx_retainer:get_basic_usage_info()),
ok. ok.
%% test whether the app can start normally after disabling emqx_retainer
%% fix: https://github.com/emqx/emqx/pull/8911
test_disable_then_start(_Config) ->
emqx_retainer:update_config(#{<<"enable">> => false}),
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
ok = application:stop(emqx_retainer),
timer:sleep(100),
?assertEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
ok = application:ensure_started(emqx_retainer),
timer:sleep(100),
?assertNotEqual([], gproc_pool:active_workers(emqx_retainer_dispatcher)),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Helper functions %% Helper functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -7,7 +7,7 @@ set -euo pipefail
DEBUG="${DEBUG:-0}" DEBUG="${DEBUG:-0}"
[ "$DEBUG" -eq 1 ] && set -x [ "$DEBUG" -eq 1 ] && set -x
RUNNER_ROOT_DIR="$(cd "$(dirname "$(readlink "$0" || echo "$0")")"/..; pwd -P)" RUNNER_ROOT_DIR="$(cd "$(dirname "$(realpath "$0" || echo "$0")")"/..; pwd -P)"
# shellcheck disable=SC1090,SC1091 # shellcheck disable=SC1090,SC1091
. "$RUNNER_ROOT_DIR"/releases/emqx_vars . "$RUNNER_ROOT_DIR"/releases/emqx_vars
@ -600,7 +600,7 @@ is_down() {
if ps -p "$PID" | grep -q 'defunct'; then if ps -p "$PID" | grep -q 'defunct'; then
# zombie state, print parent pid # zombie state, print parent pid
parent="$(ps -o ppid= -p "$PID" | tr -d ' ')" parent="$(ps -o ppid= -p "$PID" | tr -d ' ')"
echo "WARN: $PID is marked <defunct>, parent:" echo "WARNING: $PID is marked <defunct>, parent:"
ps -p "$parent" ps -p "$parent"
return 0 return 0
fi fi
@ -748,8 +748,9 @@ export ESCRIPT_NAME="$SHORT_NAME"
PIPE_DIR="${PIPE_DIR:-/$DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}" PIPE_DIR="${PIPE_DIR:-/$DATA_DIR/${WHOAMI}_erl_pipes/$NAME/}"
## make EMQX_NODE_COOKIE right ## Resolve Erlang cookie.
if [ -n "${EMQX_NODE_COOKIE:-}" ]; then if [ -n "${EMQX_NODE_COOKIE:-}" ]; then
## To be backward compatible, read EMQX_NODE_COOKIE
export EMQX_NODE__COOKIE="${EMQX_NODE_COOKIE}" export EMQX_NODE__COOKIE="${EMQX_NODE_COOKIE}"
unset EMQX_NODE_COOKIE unset EMQX_NODE_COOKIE
fi fi
@ -762,9 +763,13 @@ if [ -z "$COOKIE" ]; then
COOKIE="$(grep -E '^-setcookie' "${vm_args_file}" | awk '{print $2}')" COOKIE="$(grep -E '^-setcookie' "${vm_args_file}" | awk '{print $2}')"
fi fi
fi fi
[ -z "$COOKIE" ] && COOKIE="$EMQX_DEFAULT_ERLANG_COOKIE"
if [ -z "$COOKIE" ]; then if [ $IS_BOOT_COMMAND = 'yes' ] && [ "$COOKIE" = "$EMQX_DEFAULT_ERLANG_COOKIE" ]; then
die "Please set node.cookie in $EMQX_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE__COOKIE" echoerr "!!!!!!"
echoerr "WARNING: Default (insecure) Erlang cookie is in use."
echoerr "WARNING: Configure node.cookie in $EMQX_ETC_DIR/emqx.conf or override from environment variable EMQX_NODE__COOKIE"
echoerr "NOTE: Use the same config value for all nodes in the cluster."
echoerr "!!!!!!"
fi fi
## check if OTP version has mnesia_hook feature; if not, fallback to ## check if OTP version has mnesia_hook feature; if not, fallback to

1
build
View File

@ -163,6 +163,7 @@ make_relup() {
local name_pattern local name_pattern
name_pattern="${PROFILE}-$(./pkg-vsn.sh "$PROFILE" --vsn_matcher --long)" name_pattern="${PROFILE}-$(./pkg-vsn.sh "$PROFILE" --vsn_matcher --long)"
local releases=() local releases=()
mkdir -p _upgrade_base
while read -r tgzfile ; do while read -r tgzfile ; do
local base_vsn local base_vsn
base_vsn="$(echo "$tgzfile" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-(alpha|beta|rc)\.[0-9])?(-[0-9a-f]{8})?" | head -1)" base_vsn="$(echo "$tgzfile" | grep -oE "[0-9]+\.[0-9]+\.[0-9]+(-(alpha|beta|rc)\.[0-9])?(-[0-9a-f]{8})?" | head -1)"

View File

@ -562,6 +562,7 @@ defmodule EMQXUmbrella.MixProject do
defp template_vars(release, release_type, :bin = _package_type, edition_type) do defp template_vars(release, release_type, :bin = _package_type, edition_type) do
[ [
emqx_default_erlang_cookie: default_cookie(),
platform_data_dir: "data", platform_data_dir: "data",
platform_etc_dir: "etc", platform_etc_dir: "etc",
platform_log_dir: "log", platform_log_dir: "log",
@ -584,6 +585,7 @@ defmodule EMQXUmbrella.MixProject do
defp template_vars(release, release_type, :pkg = _package_type, edition_type) do defp template_vars(release, release_type, :pkg = _package_type, edition_type) do
[ [
emqx_default_erlang_cookie: default_cookie(),
platform_data_dir: "/var/lib/emqx", platform_data_dir: "/var/lib/emqx",
platform_etc_dir: "/etc/emqx", platform_etc_dir: "/etc/emqx",
platform_log_dir: "/var/log/emqx", platform_log_dir: "/var/log/emqx",
@ -604,6 +606,10 @@ defmodule EMQXUmbrella.MixProject do
] ++ build_info() ] ++ build_info()
end end
defp default_cookie() do
"emqx50elixir"
end
defp emqx_description(release_type, edition_type) do defp emqx_description(release_type, edition_type) do
case {release_type, edition_type} do case {release_type, edition_type} do
{:cloud, :enterprise} -> {:cloud, :enterprise} ->

View File

@ -298,14 +298,13 @@ relform() ->
emqx_description(cloud, ee) -> "EMQX Enterprise"; emqx_description(cloud, ee) -> "EMQX Enterprise";
emqx_description(cloud, ce) -> "EMQX". emqx_description(cloud, ce) -> "EMQX".
overlay_vars(RelType, PkgType, Edition) -> overlay_vars(cloud, PkgType, Edition) ->
overlay_vars_rel(RelType) ++ [
{emqx_default_erlang_cookie, "emqxsecretcookie"}
] ++
overlay_vars_pkg(PkgType) ++ overlay_vars_pkg(PkgType) ++
overlay_vars_edition(Edition). overlay_vars_edition(Edition).
overlay_vars_rel(cloud) ->
[{vm_args_file, "vm.args"}].
overlay_vars_edition(ce) -> overlay_vars_edition(ce) ->
[ [
{emqx_schema_mod, emqx_conf_schema}, {emqx_schema_mod, emqx_conf_schema},
@ -485,11 +484,16 @@ emqx_etc_overlay_per_edition(ee) ->
]. ].
get_vsn(Profile) -> get_vsn(Profile) ->
%% to make it compatible to Linux and Windows, case os:getenv("PKG_VSN") of
%% we must use bash to execute the bash file false ->
%% because "./" will not be recognized as an internal or external command os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile));
os_cmd("pkg-vsn.sh " ++ atom_to_list(Profile)). Vsn ->
Vsn
end.
%% to make it compatible to Linux and Windows,
%% we must use bash to execute the bash file
%% because "./" will not be recognized as an internal or external command
os_cmd(Cmd) -> os_cmd(Cmd) ->
Output = os:cmd("bash " ++ Cmd), Output = os:cmd("bash " ++ Cmd),
re:replace(Output, "\n", "", [{return, list}]). re:replace(Output, "\n", "", [{return, list}]).

View File

@ -9,19 +9,17 @@ ERL_OPTS="{{ erl_opts }}"
RUNNER_BIN_DIR="{{ runner_bin_dir }}" RUNNER_BIN_DIR="{{ runner_bin_dir }}"
RUNNER_LIB_DIR="{{ runner_lib_dir }}" RUNNER_LIB_DIR="{{ runner_lib_dir }}"
IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}" IS_ELIXIR="${IS_ELIXIR:-{{ is_elixir }}}"
## Allow users to pre-set `RUNNER_LOG_DIR` because it only affects boot commands like `start` and `console`, ## Allow users to pre-set `RUNNER_LOG_DIR` because it only affects boot commands like `start` and `console`,
## but not other commands such as `ping` and `ctl`. ## but not other commands such as `ping` and `ctl`.
RUNNER_LOG_DIR="${RUNNER_LOG_DIR:-{{ runner_log_dir }}}" RUNNER_LOG_DIR="${RUNNER_LOG_DIR:-{{ runner_log_dir }}}"
EMQX_ETC_DIR="{{ emqx_etc_dir }}" EMQX_ETC_DIR="{{ emqx_etc_dir }}"
RUNNER_USER="{{ runner_user }}" RUNNER_USER="{{ runner_user }}"
SCHEMA_MOD="{{ emqx_schema_mod }}" SCHEMA_MOD="{{ emqx_schema_mod }}"
IS_ENTERPRISE="{{ is_enterprise }}" IS_ENTERPRISE="{{ is_enterprise }}"
## Do not change EMQX_DEFAULT_ERLANG_COOKIE.
## Configure EMQX_NODE_COOKIE instead
EMQX_DEFAULT_ERLANG_COOKIE='{{ emqx_default_erlang_cookie }}'
REL_NAME="emqx"
export EMQX_DESCRIPTION='{{ emqx_description }}' export EMQX_DESCRIPTION='{{ emqx_description }}'
## computed vars
REL_NAME="emqx"
## updated vars here ## updated vars here

View File

@ -42,7 +42,7 @@ curl -L --silent --show-error \
--output "${RELEASE_ASSET_FILE}" \ --output "${RELEASE_ASSET_FILE}" \
"$DIRECT_DOWNLOAD_URL" "$DIRECT_DOWNLOAD_URL"
unzip -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH" unzip -o -q "$RELEASE_ASSET_FILE" -d "$DASHBOARD_PATH"
rm -rf "$DASHBOARD_PATH/www" rm -rf "$DASHBOARD_PATH/www"
mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www" mv "$DASHBOARD_PATH/dist" "$DASHBOARD_PATH/www"
rm -f "$RELEASE_ASSET_FILE" rm -f "$RELEASE_ASSET_FILE"

View File

@ -1,21 +0,0 @@
#!/bin/bash
set -uo pipefail
if [ -z "${1:-}" ]; then
SCHEMA="_build/emqx/lib/emqx_dashboard/priv/www/static/schema.json"
else
SCHEMA="$1"
fi
docker run -d --name langtool "ghcr.io/emqx/emqx-schema-validate:0.3.5"
docker exec -i langtool ./emqx_schema_validate - < "${SCHEMA}"
success="$?"
docker kill langtool || true
docker rm langtool || true
echo "If this script finds a false positive (e.g. when it things that a protocol name is a typo),
make a PR here: https://github.com/emqx/emqx-schema-validate/blob/master/dict/en_spelling_additions.txt"
exit "$success"

View File

@ -0,0 +1,265 @@
ACL
AES
APIs
BPAPI
BSON
Backplane
CA
CAs
CHACHA
CLI
CMD
CN
CONNACK
CoAP
Cygwin
DES
DN
DNS
DTLS
DevOps
Dialyzer
Diffie
EIP
EMQX
EPMD
ERL
ETS
FIXME
GCM
HMAC
HOCON
HTTPS
JSON
JWK
JWKs
JWT
Kubernetes
LwM
MQTT
Makefile
MitM
Multicast
NIF
OTP
PEM
PINGREQ
PSK
PSK
PSKs
PUBREL
QoS
RESTful
ROADMAP
RSA
Req
Riak
SHA
SMS
Struct
TCP
TLS
TTL
UDP
URI
XMLs
acceptors
ack
acked
addr
api
apiserver
arg
args
async
attr
auth
authenticator
authenticators
authn
authz
autoclean
autoheal
backend
backends
backoff
backplane
backtrace
badarg
badkey
bcrypt
behaviour
bhvr
boolean
bytesize
cacert
cacertfile
certfile
ci
clientid
clientinfo
cmake
coap
conf
config
configs
confirmable
conn
connectionless
cors
cpu
ctx
customizable
datagram
datagrams
desc
dir
dns
downlink
downlink
dtls
ekka
emqx
enablement
enqueue
enqueued
env
eof
epmd
erl
erts
escript
etcd
eval
exe
executables
exhook
exproto
extensibility
formatter
gRPC
github
goto
grpcbox
hocon
hoconsc
hostname
hrl
http
https
iface
img
impl
inet
inflight
ini
init
ip
ipv
jenkins
jq
kb
keepalive
libcoap
lifecycle
localhost
lwm
mnesia
mountpoint
mqueue
mria
msg
multicalls
multicasts
namespace
natively
nodelay
nodetool
nullable
num
os
params
peerhost
peername
perf
powershell
procmem
procs
progname
prometheus
proto
ps
psk
pubsub
pushgateway
qlen
qmode
qos
quic
ratelimit
rebar
recbuf
relup
replayq
replicant
repo
reuseaddr
rh
rlog
rootdir
rpc
runtime
sc
scalable
seg
setcookie
sharded
shareload
sn
sndbuf
sockname
sql
src
ssl
statsd
structs
subprotocol
subprotocols
superset
sys
sysmem
sysmon
tcp
ticktime
tlog
tls
tlsv
travis
trie
ttl
typerefl
udp
uid
un-acked
unsub
uplink
url
utc
util
ver
vm
vsn
wakaama
websocket
ws
wss
xml
HStream
HStreamDB
hstream
hstreamDB
hstream
hstreamdb
SASL
GSSAPI
keytab

View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
set -euo pipefail
# ensure dir
cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
PROJ_ROOT="$(pwd)"
if [ -z "${1:-}" ]; then
SCHEMA="${PROJ_ROOT}/_build/emqx/lib/emqx_dashboard/priv/www/static/schema.json"
else
SCHEMA="$(realpath "$1")"
fi
set +e
docker run --rm -i --name spellcheck \
-v "${PROJ_ROOT}"/scripts/spellcheck/dicts:/dicts \
-v "$SCHEMA":/schema.json \
ghcr.io/emqx/emqx-schema-validate:0.4.0 /schema.json
result="$?"
if [ "$result" -eq 0 ]; then
echo "Spellcheck OK"
exit 0
fi
echo "If this script finds a false positive (e.g. when it thinks that a protocol name is a typo),"
echo "Add the word to dictionary in scripts/spellcheck/dicts"
exit $result