Merge pull request #6694 from HJianBo/main-v4.4-merged-main-v4.3

Sync main-v4.3 into main-v4.4
This commit is contained in:
Zaiming (Stone) Shi 2022-01-12 11:44:56 +01:00 committed by GitHub
commit 1733dd9d13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 413 additions and 214 deletions

View File

@ -39,6 +39,7 @@ jobs:
echo "::set-output name=imgname::emqx-ee"
echo "::set-output name=version::$(./pkg-vsn.sh)"
else
make emqx-docker
echo "::set-output name=imgname::emqx"
echo "::set-output name=version::$(./pkg-vsn.sh)"
fi

View File

@ -24,10 +24,12 @@ jobs:
echo "TARGET=emqx/emqx-ee" >> $GITHUB_ENV
echo "PROFILE=emqx-ee" >> $GITHUB_ENV
echo "EMQX_TAG=$(./pkg-vsn.sh)" >> $GITHUB_ENV
make emqx-ee-docker
else
echo "TARGET=emqx/emqx" >> $GITHUB_ENV
echo "PROFILE=emqx" >> $GITHUB_ENV
echo "EMQX_TAG=$(./pkg-vsn.sh)" >> $GITHUB_ENV
make emqx-docker
fi
- name: make emqx image
env:

1
.gitignore vendored
View File

@ -59,3 +59,4 @@ erlang_ls.config
*#
# For direnv
.envrc
mix.lock

98
CHANGES.md Normal file
View File

@ -0,0 +1,98 @@
# EMQ X 4.3 Changes
Started tracking changes in CHANGE.md since EMQ X v4.3.11
NOTE: Keep prepending to the head of the file instead of the tail
File format:
- Use weight-2 heading for releases
- One list item per change topic
Change log ends with a list of github PRs
## v4.3.11
Important notes:
- For Debian/Ubuntu users
We changed the package installed service from init.d to systemd.
The upgrade from init.d to systemd is verified, however it is
recommended to verify it before rolling out to production.
At least to ensure systemd is available in your system.
- For Centos Users
RPM package now depends on `openssl11` which is NOT available
in certain centos distributions.
Please make sure the yum repo [epel-release](https://docs.fedoraproject.org/en-US/epel) is installed.
### Important changes
* Debian/Ubuntu package (deb) installed EMQ X now runs on systemd [#6389]<br>
This is to take advantage of systemd's supervision functionality to ensure
EMQ X service is restarted after crashes.
### Minor changes
* Clustering malfunction fixes [#6221, #6381]
Mostly changes made in [ekka](https://github.com/emqx/ekka/pull/134)<br>
From 0.8.1.4 to 0.8.1.6, fixes included intra-cluster RPC call timeouts,<br>
also fixed `ekka_locker` process crashe after killing a hanged lock owner.
* Improved log message when TCP proxy is in use but proxy_protocol configuration is not turned on [#6416]<br>
"please check proxy_protocol config for specific listeners and zones" to hint a misconfiguration
* Helm chart supports networking.k8s.io/v1 [#6368]
* Fix session takeover race condition which may lead to message loss [#6396]
* EMQ X docker images are pushed to aws public ecr in an automated CI job [#6271]<br>
`docker pull public.ecr.aws/emqx/emqx:4.3.10`
* Fix webhook URL path to allow rule-engine variable substitution [#6399]
* Corrected RAM usage display [#6379]
* Changed emqx_sn_registry table creation to runtime [#6357]<br>
This was a bug introduced in 4.3.3, in which the table is changed from ets to mnesia<br>
this will cause upgrade to fail when a later version node joins a 4.3.0-2 cluster<br>
* Log level for normal termination changed from info to debug [#6358]
* Added config `retainer.stop_publish_clear_msg` to enable/disable empty message retained message publish [#6343]<br>
In MQTT 3.1.1, it is unclear if a MQTT broker should publish the 'clear' (no payload) message<br>
to the subscribers, or just delete the retained message. So we have made it configurable
* Fix mqtt bridge malfunction when remote host is unreachable (hangs the connection) [#6286, #6323]
* System monitor now inspects `current_stacktrace` of suspicious process [#6290]<br>
`current_function` was not quite helpful
* Changed default `max_topc_levels` config value to 128 [#6294, #6420]<br>
previously it has no limit (config value = 0), which can be a potential DoS threat
* Collect only libcrypto and libtinfo so files for zip package [#6259]<br>
in 4.3.10 we tried to collect all so files, however glibc is not quite portable
* Added openssl-1.1 to RPM dependency [#6239]
* Http client duplicated header fix [#6195]
* Fix `node_dump` issues when working with deb or rpm installation [#6209]
* Pin Erlang/OTP 23.2.7.2-emqx-3 [#6246]<br>
4.3.10 is on 23.2.7.2-emqx-2, this bump is to fix an ECC signature name typo:
ecdsa_secp512r1_sha512 -> ecdsa_secp521r1_sha512
* HTTP client performance improvement [#6474, #6414]<br>
The changes are mostly done in the dependency [repo](https://github.com/emqx/ehttpc).
* For messages from gateways add message properties as MQTT message headers [#6142]<br>
e.g. messages from CoAP, LwM2M, Stomp, ExProto, when translated into MQTT message<br>
properties such as protocol name, protocol version, username (if any) peer-host<br>
etc. are filled as MQTT message headers.
## v4.3.0~10
Older version changes are not tracked here.

View File

@ -1,17 +1,5 @@
%% -*- mode: erlang -*-
{VSN,
[ {<<"4\\.3\\.[0-8]+">>,
[ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management}
]},
{<<".*">>, []}
],
[ {<<"4\\.3\\.[0-8]+">>,
[ {apply,{minirest,stop_http,['http:management']}},
{apply,{minirest,stop_http,['https:management']}},
{restart_application, emqx_management}
]},
{<<".*">>, []}
]
[{<<".*">>,[]}],
[{<<".*">>,[]}]
}.

View File

@ -62,8 +62,8 @@
-export([ list_subscriptions/1
, list_subscriptions_via_topic/2
, list_subscriptions_via_topic/3
, lookup_subscriptions/1
, lookup_subscriptions/2
, lookup_subscriptions/3
]).
%% Routes
@ -331,18 +331,20 @@ list_subscriptions_via_topic(Node, Topic, {M,F}) when Node =:= node() ->
list_subscriptions_via_topic(Node, Topic, FormatFun) ->
rpc_call(Node, list_subscriptions_via_topic, [Node, Topic, FormatFun]).
lookup_subscriptions(ClientId) ->
lists:append([lookup_subscriptions(Node, ClientId) || Node <- ekka_mnesia:running_nodes()]).
lookup_subscriptions(ClientId, FormatFun) ->
lists:append([lookup_subscriptions(Node, ClientId, FormatFun) || Node <- ekka_mnesia:running_nodes()]).
lookup_subscriptions(Node, ClientId) when Node =:= node() ->
case ets:lookup(emqx_subid, ClientId) of
[] -> [];
[{_, Pid}] ->
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
end;
lookup_subscriptions(Node, ClientId, {M, F}) when Node =:= node() ->
Result = case ets:lookup(emqx_subid, ClientId) of
[] -> [];
[{_, Pid}] ->
ets:match_object(emqx_suboption, {{Pid, '_'}, '_'})
end,
%% format at the called node
erlang:apply(M, F, [Result]);
lookup_subscriptions(Node, ClientId) ->
rpc_call(Node, lookup_subscriptions, [Node, ClientId]).
lookup_subscriptions(Node, ClientId, FormatFun) ->
rpc_call(Node, lookup_subscriptions, [Node, ClientId, FormatFun]).
%%--------------------------------------------------------------------
%% Routes

View File

@ -326,10 +326,12 @@ to_integer(I) when is_integer(I) ->
to_integer(B) when is_binary(B) ->
binary_to_integer(B).
%% @doc The input timestamp time is in seconds, which needs to be
%% converted to internal milliseconds here
to_timestamp(I) when is_integer(I) ->
I;
I * 1000;
to_timestamp(B) when is_binary(B) ->
binary_to_integer(B).
binary_to_integer(B) * 1000.
aton(B) when is_binary(B) ->
list_to_tuple([binary_to_integer(T) || T <- re:split(B, "[.]")]).
@ -361,7 +363,7 @@ params2qs_test() ->
ExpectedQs = [{str, '=:=', <<"abc">>},
{int, '=:=', 123},
{atom, '=:=', connected},
{ts, '=:=', 156000},
{ts, '=:=', 156000000},
{range, '>=', 1, '=<', 5}
],
FuzzyQs = [{fuzzy, like, <<"user">>},

View File

@ -361,31 +361,24 @@ query({Qs, Fuzzy}, Start, Limit) ->
match_fun(Ms, Fuzzy) ->
MsC = ets:match_spec_compile(Ms),
REFuzzy = lists:map(fun({K, like, S}) ->
{ok, RE} = re:compile(escape(S)),
{K, like, RE}
end, Fuzzy),
fun(Rows) ->
case ets:match_spec_run(Rows, MsC) of
[] -> [];
Ls ->
lists:filter(fun(E) ->
run_fuzzy_match(E, REFuzzy)
run_fuzzy_match(E, Fuzzy)
end, Ls)
end
end.
escape(B) when is_binary(B) ->
re:replace(B, <<"\\\\">>, <<"\\\\\\\\">>, [{return, binary}, global]).
run_fuzzy_match(_, []) ->
true;
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, _, RE} | Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, "") of
undefined -> "";
run_fuzzy_match(E = {_, #{clientinfo := ClientInfo}, _}, [{Key, like, SubStr}|Fuzzy]) ->
Val = case maps:get(Key, ClientInfo, undefined) of
undefined -> <<>>;
V -> V
end,
re:run(Val, RE, [{capture, none}]) == match andalso run_fuzzy_match(E, Fuzzy).
binary:match(Val, SubStr) /= nomatch andalso run_fuzzy_match(E, Fuzzy).
%%--------------------------------------------------------------------
%% QueryString to Match Spec
@ -473,10 +466,10 @@ params2qs_test() ->
proto_ver => 4,
connected_at => '$3'},
session => #{created_at => '$2'}},
ExpectedCondi = [{'>=','$2', 1},
{'=<','$2', 5},
{'>=','$3', 1},
{'=<','$3', 5}],
ExpectedCondi = [{'>=','$2', 1000},
{'=<','$2', 5000},
{'>=','$3', 1000},
{'=<','$3', 5000}],
{10, {Qs1, []}} = emqx_mgmt_api:params2qs(Params, QsSchema),
[{{'$1', MtchHead, _}, Condi, _}] = qs2ms(Qs1),
?assertEqual(ExpectedMtchHead, MtchHead),
@ -484,9 +477,24 @@ params2qs_test() ->
[{{'$1', #{}, '_'}, [], ['$_']}] = qs2ms([]).
escape_test() ->
Str = <<"\\n">>,
{ok, Re} = re:compile(escape(Str)),
{match, _} = re:run(<<"\\name">>, Re).
fuzzy_match_test() ->
Info = {emqx_channel_info,
#{clientinfo =>
#{ clientid => <<"abcde">>
, username => <<"abc\\name*[]()">>
}}, []
},
true = run_fuzzy_match(Info, [{clientid, like, <<"abcde">>}]),
true = run_fuzzy_match(Info, [{clientid, like, <<"bcd">>}]),
false = run_fuzzy_match(Info, [{clientid, like, <<"defh">>}]),
true = run_fuzzy_match(Info, [{username, like, <<"\\name">>}]),
true = run_fuzzy_match(Info, [{username, like, <<"*">>}]),
true = run_fuzzy_match(Info, [{username, like, <<"[]">>}]),
true = run_fuzzy_match(Info, [{username, like, <<"()">>}]),
false = run_fuzzy_match(Info, [{username, like, <<"))">>}]),
true = run_fuzzy_match(Info, [{clientid, like, <<"de">>},
{username, like, <<"[]">>}]).
-endif.

View File

@ -36,11 +36,10 @@ list(_Bindings, _Params) ->
minirest:return({ok, [format(Node, Info) || {Node, Info} <- emqx_mgmt:list_nodes()]}).
get(#{node := Node}, _Params) ->
minirest:return({ok, emqx_mgmt:lookup_node(Node)}).
minirest:return({ok, format(Node, emqx_mgmt:lookup_node(Node))}).
format(Node, {error, Reason}) -> #{node => Node, error => Reason};
format(_Node, Info = #{memory_total := Total, memory_used := Used}) ->
Info#{memory_total := emqx_mgmt_util:kmg(Total),
memory_used := emqx_mgmt_util:kmg(Used)}.

View File

@ -85,10 +85,10 @@ list(#{node := Node} = Bindings, Params) ->
end.
lookup(#{node := Node, clientid := ClientId}, _Params) ->
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId)))});
minirest:return({ok, emqx_mgmt:lookup_subscriptions(Node, emqx_mgmt_util:urldecode(ClientId), ?format_fun)});
lookup(#{clientid := ClientId}, _Params) ->
minirest:return({ok, format(emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId)))}).
minirest:return({ok, emqx_mgmt:lookup_subscriptions(emqx_mgmt_util:urldecode(ClientId), ?format_fun)}).
format(Items) when is_list(Items) ->
[format(Item) || Item <- Items];

View File

@ -1,5 +1,7 @@
#!/bin/bash
#!/usr/bin/env bash
set -eux pipefail
# Helper script for creating data export files
container() {

View File

@ -1,15 +1,5 @@
%% -*-: erlang -*-
%% -*- mode: erlang -*-
{VSN,
[
{<<"4\\.3\\.[0-1]+">>, [
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{<<"4\\.3\\.[0-1]+">>, [
{load_module, emqx_retainer, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
]
[{<<".*">>,[]}],
[{<<".*">>,[]}]
}.

View File

@ -33,7 +33,9 @@ cmd(["info"]) ->
cmd(["topics"]) ->
case mnesia:dirty_all_keys(?TAB) of
[] -> ignore;
Topics -> lists:foreach(fun(Topic) -> emqx_ctl:print("~s~n", [Topic]) end, Topics)
Topics -> lists:foreach(fun(Topic) ->
emqx_ctl:print("~s~n", [emqx_topic:join(Topic)])
end, Topics)
end;
cmd(["clean"]) ->
@ -55,4 +57,3 @@ cmd(_) ->
unload() ->
emqx_ctl:unregister_command(retainer).

View File

@ -23,18 +23,32 @@
all() -> emqx_ct:all(?MODULE).
init_per_testcase(_TestCase, Config) ->
init_per_suite(Config) ->
emqx_ct_helpers:start_apps([emqx_retainer]),
Config.
end_per_suite(_Config) ->
emqx_ct_helpers:stop_apps([emqx_retainer]).
init_per_testcase(TestCase, Config) ->
Config.
end_per_testcase(_TestCase, Config) ->
Config.
emqx_retainer:clean(<<"#">>).
% t_cmd(_) ->
% error('TODO').
t_cmd(_) ->
{ok, C1} = emqtt:start_link([{clean_start, true}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C1),
emqtt:publish(C1, <<"/retained">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
emqtt:publish(C1, <<"/retained/2">>, <<"this is a retained message">>, [{qos, 0}, {retain, true}]),
timer:sleep(1000),
?assertMatch(ok, emqx_retainer_cli:cmd(["topics"])),
?assertMatch(ok, emqx_retainer_cli:cmd(["info"])),
?assertMatch(ok, emqx_retainer_cli:cmd(["clean", "retained"])),
?assertMatch(ok, emqx_retainer_cli:cmd(["clean"])).
% t_unload(_) ->
% error('TODO').
% t_load(_) ->
% error('TODO').

View File

@ -1,52 +1,5 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{"4.3.5",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{apply,{emqx_stats,cancel_update,[rule_registery_stats]}},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.3.3",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{"4.3.4",
[{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.
[{<<".*">>,[]}],
[{<<".*">>,[]}]
}.

View File

@ -491,11 +491,24 @@ may_update_rule_params(Rule, Params = #{on_action_failed := OnFailed}) ->
may_update_rule_params(Rule = #rule{actions = OldActions}, Params = #{actions := Actions}) ->
%% prepare new actions before removing old ones
NewActions = prepare_actions(Actions, maps:get(enabled, Params, true)),
ok = restore_action_metrics(OldActions, NewActions),
_ = ?CLUSTER_CALL(clear_actions, [OldActions]),
may_update_rule_params(Rule#rule{actions = NewActions}, maps:remove(actions, Params));
may_update_rule_params(Rule, _Params) -> %% ignore all the unsupported params
Rule.
%% NOTE: if the user removed an action, but the action is not the last one in the list,
%% the `restore_action_metrics/2` will not work as expected!
restore_action_metrics([#action_instance{id = OldId} | OldActions],
[#action_instance{id = NewId} | NewActions]) ->
emqx_rule_metrics:inc_actions_taken(NewId, emqx_rule_metrics:get_actions_taken(OldId)),
emqx_rule_metrics:inc_actions_success(NewId, emqx_rule_metrics:get_actions_success(OldId)),
emqx_rule_metrics:inc_actions_error(NewId, emqx_rule_metrics:get_actions_error(OldId)),
emqx_rule_metrics:inc_actions_exception(NewId, emqx_rule_metrics:get_actions_exception(OldId)),
restore_action_metrics(OldActions, NewActions);
restore_action_metrics(_, _) ->
ok.
ignore_lib_apps(Apps) ->
LibApps = [kernel, stdlib, sasl, appmon, eldap, erts,
syntax_tools, ssl, crypto, mnesia, os_mon,

View File

@ -204,6 +204,10 @@ match_conditions({}, _Data) ->
true.
%% comparing numbers against strings
compare(Op, undefined, undefined) ->
do_compare(Op, undefined, undefined);
compare(_Op, L, R) when L == undefined; R == undefined ->
false;
compare(Op, L, R) when is_number(L), is_binary(R) ->
do_compare(Op, L, number(R));
compare(Op, L, R) when is_binary(L), is_number(R) ->

View File

@ -1,27 +1,14 @@
%% -*-: erlang -*-
%% -*- mode: erlang -*-
{VSN,
[
{"4.3.3", [
{load_module, emqx_sn_registry, brutal_purge, soft_purge, []}
]},
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []},
{load_module, emqx_sn_registry, brutal_purge, soft_purge, []}
]},
{<<"4\\.3\\.[0-1]">>, [
{restart_application, emqx_sn}
]}
],
[
{"4.3.3", [
{load_module, emqx_sn_registry, brutal_purge, soft_purge, []}
]},
{"4.3.2", [
{load_module, emqx_sn_gateway, brutal_purge, soft_purge, []},
{load_module, emqx_sn_registry, brutal_purge, soft_purge, []}
]},
{<<"4\\.3\\.[0-1]">>, [
{restart_application, emqx_sn}
]}
]
}.
[{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_sn}]}],
[{"4.3.3",[{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_sn_gateway,brutal_purge,soft_purge,[]},
{load_module,emqx_sn_registry,brutal_purge,soft_purge,[]}]},
{<<"4\\.3\\.[0-1]">>,
[{restart_application,emqx_sn}]}]}.

View File

@ -1,6 +1,6 @@
{application, emqx_stomp,
[{description, "EMQ X Stomp Protocol Plugin"},
{vsn, "4.3.3"}, % strict semver, bump manually!
{vsn, "4.3.4"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_stomp_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,16 +1,24 @@
%% -*- mode: erlang -*-
{VSN,
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp},
{apply,{emqx_stomp,force_clear_after_app_stoped,[]}}]},
{<<".*">>,[]}],
[{"4.3.2",[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]}]},
[{"4.3.3",[{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.2",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]}]},
{"4.3.1",
[{load_module,emqx_stomp_protocol,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_frame,brutal_purge,soft_purge,[]},
{load_module,emqx_stomp_connection,brutal_purge,soft_purge,[]}]},
{"4.3.0",
[{restart_application,emqx_stomp}]},

View File

@ -121,7 +121,7 @@ g(Key, Opts, Val) ->
parse(<<>>, Parser) ->
{more, Parser};
parse(Bytes, #{phase := body, len := Len, state := State}) ->
parse(Bytes, #{phase := body, length := Len, state := State}) ->
parse(body, Bytes, State, Len);
parse(Bytes, Parser = #{pre := Pre}) ->

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# -*- tab-width:4;indent-tabs-mode:nil -*-
# ex: ts=4 sw=4 et

2
build
View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# This script helps to build release artifacts.
# arg1: profile, e.g. emqx | emqx-edge | emqx-pkg | emqx-edge-pkg

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
## EMQ docker image start script
# Huang Rui <vowstar@gmail.com>
# EMQ X Team <support@emqx.io>

View File

@ -1,6 +1,6 @@
{application, emqx_telemetry,
[{description, "EMQ X Telemetry"},
{vsn, "4.3.1"}, % strict semver, bump manually!
{vsn, "4.3.2"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_telemetry_sup]},
{applications, [kernel,stdlib]},

View File

@ -1,13 +1,13 @@
%% -*- mode: erlang -*-
{VSN,
[
{"4.3.0", [
{<<"4\\.3\\.[0-1]">>, [
{load_module, emqx_telemetry, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}
],
[
{"4.3.0", [
{<<"4\\.3\\.[0-1]">>, [
{load_module, emqx_telemetry, brutal_purge, soft_purge, []}
]},
{<<".*">>, []}

View File

@ -82,7 +82,7 @@
timer = undefined :: undefined | reference()
}).
%% The count of 100-nanosecond intervals between the UUID epoch
%% The count of 100-nanosecond intervals between the UUID epoch
%% 1582-10-15 00:00:00 and the UNIX epoch 1970-01-01 00:00:00.
-define(GREGORIAN_EPOCH_OFFSET, 16#01b21dd213814000).
@ -253,29 +253,23 @@ os_info() ->
[{os_name, Name},
{os_version, Version}];
{unix, _} ->
case file:read_file_info("/etc/os-release") of
case file:read_file("/etc/os-release") of
{error, _} ->
[{os_name, "Unknown"},
{os_version, "Unknown"}];
{ok, FileInfo} ->
case FileInfo#file_info.access of
Access when Access =:= read orelse Access =:= read_write ->
OSInfo = lists:foldl(fun(Line, Acc) ->
[Var, Value] = string:tokens(Line, "="),
NValue = case Value of
_ when is_list(Value) ->
lists:nth(1, string:tokens(Value, "\""));
_ ->
Value
end,
[{Var, NValue} | Acc]
end, [], string:tokens(os:cmd("cat /etc/os-release"), "\n")),
[{os_name, get_value("NAME", OSInfo)},
{os_version, get_value("VERSION", OSInfo, get_value("VERSION_ID", OSInfo))}];
_ ->
[{os_name, "Unknown"},
{os_version, "Unknown"}]
end
{ok, FileContent} ->
OSInfo = lists:foldl(fun(Line, Acc) ->
[Var, Value] = string:tokens(Line, "="),
NValue = case Value of
_ when is_list(Value) ->
lists:nth(1, string:tokens(Value, "\""));
_ ->
Value
end,
[{Var, NValue} | Acc]
end, [], string:tokens(binary:bin_to_list(FileContent), "\n")),
[{os_name, get_value("NAME", OSInfo)},
{os_version, get_value("VERSION", OSInfo, get_value("VERSION_ID", OSInfo, get_value("PRETTY_NAME", OSInfo)))}]
end;
{win32, nt} ->
Ver = os:cmd("ver"),
@ -429,5 +423,7 @@ module_attributes(Module) ->
bin(L) when is_list(L) ->
list_to_binary(L);
bin(A) when is_atom(A) ->
atom_to_binary(A);
bin(B) when is_binary(B) ->
B.

View File

@ -37,7 +37,7 @@
{deps,
[ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.13"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.1.14"}}}
, {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.5"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set -euo pipefail
latest_release=$(git describe --abbrev=0 --tags)

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
## This script prints Linux distro name and its version number
## e.g. macos, centos8, ubuntu20.04

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# shellcheck disable=2090
###############
## args and env validation

View File

@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
# shellcheck disable=2090
###############
## args and env validation

View File

@ -189,8 +189,11 @@ find_appup_actions(CurrApps, PrevApps) ->
maps:fold(
fun(App, CurrAppIdx, Acc) ->
case PrevApps of
#{App := PrevAppIdx} -> find_appup_actions(App, CurrAppIdx, PrevAppIdx) ++ Acc;
_ -> Acc %% New app, nothing to upgrade here.
#{App := PrevAppIdx} ->
find_appup_actions(App, CurrAppIdx, PrevAppIdx) ++ Acc;
_ ->
%% New app, nothing to upgrade here.
Acc
end
end,
[],
@ -199,8 +202,12 @@ find_appup_actions(CurrApps, PrevApps) ->
find_appup_actions(_App, AppIdx, AppIdx) ->
%% No changes to the app, ignore:
[];
find_appup_actions(App, CurrAppIdx, PrevAppIdx = #app{version = PrevVersion}) ->
{OldUpgrade, OldDowngrade} = find_old_appup_actions(App, PrevVersion),
find_appup_actions(App,
CurrAppIdx = #app{version = CurrVersion},
PrevAppIdx = #app{version = PrevVersion}) ->
{OldUpgrade0, OldDowngrade0} = find_old_appup_actions(App, PrevVersion),
OldUpgrade = ensure_all_patch_versions(App, CurrVersion, OldUpgrade0),
OldDowngrade = ensure_all_patch_versions(App, CurrVersion, OldDowngrade0),
Upgrade = merge_update_actions(App, diff_app(App, CurrAppIdx, PrevAppIdx), OldUpgrade),
Downgrade = merge_update_actions(App, diff_app(App, PrevAppIdx, CurrAppIdx), OldDowngrade),
if OldUpgrade =:= Upgrade andalso OldDowngrade =:= Downgrade ->
@ -210,14 +217,40 @@ find_appup_actions(App, CurrAppIdx, PrevAppIdx = #app{version = PrevVersion}) ->
[{App, {Upgrade, Downgrade, OldUpgrade, OldDowngrade}}]
end.
%% To avoid missing one patch version when upgrading, we try to
%% optimistically generate the list of expected versions that should
%% be covered by the upgrade.
ensure_all_patch_versions(App, CurrVsn, OldActions) ->
case is_app_external(App) of
true ->
%% we do not attempt to predict the version list for
%% external dependencies, as those may not follow our
%% conventions.
OldActions;
false ->
do_ensure_all_patch_versions(App, CurrVsn, OldActions)
end.
do_ensure_all_patch_versions(App, CurrVsn, OldActions) ->
case enumerate_past_versions(CurrVsn) of
{ok, ExpectedVsns} ->
CoveredVsns = [V || {V, _} <- OldActions, V =/= <<".*">>],
ExpectedVsnStrs = [vsn_number_to_string(V) || V <- ExpectedVsns],
MissingActions = [{V, []} || V <- ExpectedVsnStrs, not contains_version(V, CoveredVsns)],
MissingActions ++ OldActions;
{error, bad_version} ->
log("WARN: Could not infer expected versions to upgrade from for ~p~n", [App]),
OldActions
end.
%% For external dependencies, show only the changes that are missing
%% in their current appup.
diff_appup_instructions(ComputedChanges, PresentChanges) ->
lists:foldr(
fun({Vsn, ComputedActions}, Acc) ->
case find_matching_version(Vsn, PresentChanges) of
fun({VsnOrRegex, ComputedActions}, Acc) ->
case find_matching_version(VsnOrRegex, PresentChanges) of
undefined ->
[{Vsn, ComputedActions} | Acc];
[{VsnOrRegex, ComputedActions} | Acc];
PresentActions ->
DiffActions = ComputedActions -- PresentActions,
case DiffActions of
@ -225,7 +258,7 @@ diff_appup_instructions(ComputedChanges, PresentChanges) ->
%% no diff
Acc;
_ ->
[{Vsn, DiffActions} | Acc]
[{VsnOrRegex, DiffActions} | Acc]
end
end
end,
@ -250,8 +283,11 @@ parse_appup_diffs(Upgrade, OldUpgrade, Downgrade, OldDowngrade) ->
end.
%% TODO: handle regexes
find_matching_version(Vsn, PresentChanges) ->
proplists:get_value(Vsn, PresentChanges).
%% Since the first argument may be a regex itself, we would need to
%% check if it is "contained" within other regexes inside list of
%% versions in the second argument.
find_matching_version(VsnOrRegex, PresentChanges) ->
proplists:get_value(VsnOrRegex, PresentChanges).
find_old_appup_actions(App, PrevVersion) ->
{Upgrade0, Downgrade0} =
@ -289,11 +325,38 @@ do_merge_update_actions(App, {New0, Changed0, Deleted0}, OldActions) ->
New = New0 -- AlreadyHandled,
Changed = Changed0 -- AlreadyHandled,
Deleted = Deleted0 -- AlreadyHandled,
[{load_module, M, brutal_purge, soft_purge, []} || M <- Changed ++ New] ++
OldActions ++
Reloads = [{load_module, M, brutal_purge, soft_purge, []}
|| not contains_restart_application(App, OldActions),
M <- Changed ++ New],
{OldActionsWithStop, OldActionsAfterStop} =
find_application_stop_instruction(App, OldActions),
OldActionsWithStop ++
Reloads ++
OldActionsAfterStop ++
[{delete_module, M} || M <- Deleted] ++
AppSpecific.
%% If an entry restarts an application, there's no need to use
%% `load_module' instructions.
contains_restart_application(Application, Actions) ->
lists:member({restart_application, Application}, Actions).
%% If there is an `application:stop(Application)' call in the
%% instructions, we insert `load_module' instructions after it.
find_application_stop_instruction(Application, Actions) ->
{Before, After0} =
lists:splitwith(
fun({apply, {application, stop, [App]}}) when App =:= Application ->
false;
(_) ->
true
end, Actions),
case After0 of
[StopInst | After] ->
{Before ++ [StopInst], After};
[] ->
{[], Before}
end.
%% @doc Process the existing actions to exclude modules that are
%% already handled
@ -308,14 +371,57 @@ process_old_action(_) ->
[].
ensure_version(Version, OldInstructions) ->
OldVersions = [ensure_string(element(1, I)) || I <- OldInstructions],
case lists:member(Version, OldVersions) of
OldVersions = [element(1, I) || I <- OldInstructions],
case contains_version(Version, OldVersions) of
false ->
[{Version, []}|OldInstructions];
_ ->
[{Version, []} | OldInstructions];
true ->
OldInstructions
end.
contains_version(Needle, Haystack) when is_list(Needle) ->
lists:any(
fun(Regex) when is_binary(Regex) ->
case re:run(Needle, Regex) of
{match, _} ->
true;
nomatch ->
false
end;
(Vsn) ->
Vsn =:= Needle
end,
Haystack).
%% As a best effort approach, we assume that we only bump patch
%% version numbers between release upgrades for our dependencies and
%% that we deal only with 3-part version schemas
%% (`Major.Minor.Patch'). Using those assumptions, we enumerate the
%% past versions that should be covered by regexes in .appup file
%% instructions.
enumerate_past_versions(Vsn) when is_list(Vsn) ->
case parse_version_number(Vsn) of
{ok, ParsedVsn} ->
{ok, enumerate_past_versions(ParsedVsn)};
Error ->
Error
end;
enumerate_past_versions({Major, Minor, Patch}) ->
[{Major, Minor, P} || P <- lists:seq(Patch - 1, 0, -1)].
parse_version_number(Vsn) when is_list(Vsn) ->
Nums = string:split(Vsn, ".", all),
Results = lists:map(fun string:to_integer/1, Nums),
case Results of
[{Major, []}, {Minor, []}, {Patch, []}] ->
{ok, {Major, Minor, Patch}};
_ ->
{error, bad_version}
end.
vsn_number_to_string({Major, Minor, Patch}) ->
io_lib:format("~b.~b.~b", [Major, Minor, Patch]).
read_appup(File) ->
%% NOTE: appup file is a script, it may contain variables or functions.
case file:script(File, [{'VSN', "VSN"}]) of
@ -339,7 +445,12 @@ update_appups(Changes) ->
do_update_appup(App, Upgrade, Downgrade, OldUpgrade, OldDowngrade) ->
case locate(src, App, ".appup.src") of
{ok, AppupFile} ->
render_appfile(AppupFile, Upgrade, Downgrade);
case contains_contents(AppupFile, Upgrade, Downgrade) of
true ->
ok;
false ->
render_appfile(AppupFile, Upgrade, Downgrade)
end;
undefined ->
case create_stub(App) of
{ok, AppupFile} ->
@ -367,7 +478,8 @@ render_appfile(File, Upgrade, Downgrade) ->
ok = file:write_file(File, IOList).
create_stub(App) ->
case locate(src, App, Ext = ".app.src") of
Ext = ".app.src",
case locate(src, App, Ext) of
{ok, AppSrc} ->
DirName = filename:dirname(AppSrc),
AppupFile = filename:basename(AppSrc, Ext) ++ ".appup.src",
@ -379,6 +491,17 @@ create_stub(App) ->
false
end.
%% we check whether the destination file already has the contents we
%% want to write to avoid writing and losing indentation and comments.
contains_contents(File, Upgrade, Downgrade) ->
%% the file may contain the VSN variable, so it's a script
case file:script(File, [{'VSN', 'VSN'}]) of
{ok, {_, Upgrade, Downgrade}} ->
true;
_ ->
false
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% application and release indexing
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -398,16 +521,18 @@ index_app(AppFile) ->
, modules = Modules
}}.
diff_app(App, #app{version = NewVersion, modules = NewModules}, #app{version = OldVersion, modules = OldModules}) ->
diff_app(App,
#app{version = NewVersion, modules = NewModules},
#app{version = OldVersion, modules = OldModules}) ->
{New, Changed} =
maps:fold( fun(Mod, MD5, {New, Changed}) ->
case OldModules of
#{Mod := OldMD5} when MD5 =:= OldMD5 ->
{New, Changed};
#{Mod := _} ->
{New, [Mod|Changed]};
{New, [Mod | Changed]};
_ ->
{[Mod|New], Changed}
{[Mod | New], Changed}
end
end
, {[], []}
@ -437,6 +562,15 @@ hashsums(EbinDir) ->
filelib:wildcard("*.beam", EbinDir)
)).
is_app_external(App) ->
Ext = ".app.src",
case locate(src, App, Ext) of
{ok, _} ->
false;
undefined ->
true
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Global state
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
@ -522,10 +656,5 @@ log(Msg) ->
log(Msg, Args) ->
io:format(standard_error, Msg, Args).
ensure_string(Str) when is_binary(Str) ->
binary_to_list(Str);
ensure_string(Str) when is_list(Str) ->
Str.
otp_standard_apps() ->
[ssl, mnesia, kernel, asn1, stdlib].

View File

@ -600,7 +600,7 @@ do_publish(PacketId, Msg = #message{qos = ?QOS_2},
?LOG(warning, "Dropped the qos2 packet ~w "
"due to awaiting_rel is full.", [PacketId]),
ok = emqx_metrics:inc('packets.publish.dropped'),
handle_out(pubrec, {PacketId, RC}, Channel)
handle_out(disconnect, RC, Channel)
end.
ensure_quota(_, Channel = #channel{quota = undefined}) ->

View File

@ -211,7 +211,8 @@ t_handle_in_qos2_publish_with_error_return(_) ->
{ok, ?PUBREC_PACKET(2, ?RC_NO_MATCHING_SUBSCRIBERS), Channel1} =
emqx_channel:handle_in(Publish2, Channel),
Publish3 = ?PUBLISH_PACKET(?QOS_2, <<"topic">>, 3, <<"payload">>),
{ok, ?PUBREC_PACKET(3, ?RC_RECEIVE_MAXIMUM_EXCEEDED), Channel1} =
{ok, [{outgoing, ?DISCONNECT_PACKET(?RC_RECEIVE_MAXIMUM_EXCEEDED)},
{close, receive_maximum_exceeded}], Channel1} =
emqx_channel:handle_in(Publish3, Channel1).
t_handle_in_puback_ok(_) ->