Merge pull request #8274 from emqx/copy-of_main-v4.3

merge main-v4.3 -> main-v4.4
This commit is contained in:
Xinyu Liu 2022-06-20 14:04:28 +08:00 committed by GitHub
commit 5a746448e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 194 additions and 45 deletions

View File

@ -34,5 +34,8 @@ jobs:
with:
name: expected_appup_files
path: |
{src,apps}/**/*.appup.src
apps/*/src/*.appup.src
src/*.appup.src
lib-ce/*/src/*.appup.src
lib-ee/*/src/*.appup.src
retention-days: 1

View File

@ -4,9 +4,7 @@
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{deps,
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
]}.
{deps, []}.
{grpc,
[{protos, ["priv/protos"]},

View File

@ -12,9 +12,7 @@
{grpc_plugin, {git, "https://github.com/HJianBo/grpc_plugin", {tag, "v0.10.2"}}}
]}.
{deps,
[{grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.4"}}}
]}.
{deps, []}.
{grpc,
[{type, all},

View File

@ -5,11 +5,14 @@
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{add_module,emqx_rule_date},
@ -67,11 +70,14 @@
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_engine,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}]},
{"4.4.3",
[{load_module,emqx_rule_validator,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_utils,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_events,brutal_purge,soft_purge,[]},
{load_module,emqx_rule_funcs,brutal_purge,soft_purge,[]},
{delete_module,emqx_rule_date},

View File

@ -509,7 +509,7 @@ refresh_resource_status() ->
fun(#resource{id = ResId, type = ResType}) ->
case emqx_rule_registry:find_resource_type(ResType) of
{ok, #resource_type{on_status = {Mod, OnStatus}}} ->
_ = fetch_resource_status(Mod, OnStatus, ResId);
fetch_resource_status(Mod, OnStatus, ResId);
_ -> ok
end
end, emqx_rule_registry:get_resources()).

View File

@ -51,13 +51,9 @@ apply_rules([], _Input) ->
apply_rules([#rule{enabled = false}|More], Input) ->
apply_rules(More, Input);
apply_rules([Rule|More], Input) ->
apply_rule_discard_result(Rule, Input),
apply_rule(Rule, Input),
apply_rules(More, Input).
apply_rule_discard_result(Rule, Input) ->
_ = apply_rule(Rule, Input),
ok.
apply_rule(Rule = #rule{id = RuleID}, Input) ->
clear_rule_payload(),
ok = emqx_rule_metrics:inc_rules_matched(RuleID),

View File

@ -1,6 +1,6 @@
{application, emqx_web_hook,
[{description, "EMQ X WebHook Plugin"},
{vsn, "4.3.11"}, % strict semver, bump manually!
{vsn, "4.3.12"}, % strict semver, bump manually!
{modules, []},
{registered, [emqx_web_hook_sup]},
{applications, [kernel,stdlib,ehttpc]},

View File

@ -12,10 +12,20 @@
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}],
[{<<"4\\.3\\.[0-2]">>,
[{apply,{application,stop,[emqx_web_hook]}},
@ -28,8 +38,18 @@
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]},
{"4.3.8",
[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.11",
[{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]},
{load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]},
{"4.3.9",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{"4.3.10",[{load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}]},
{<<".*">>,[]}]}.

View File

@ -344,15 +344,13 @@ send_http_request(ClientID, Params) ->
ok
end.
parse_rule(Rules) ->
parse_rule(Rules, []).
parse_rule([], Acc) ->
lists:reverse(Acc);
parse_rule([{Rule, Conf} | Rules], Acc) ->
parse_rule([]) ->
[];
parse_rule([{Rule, Conf} | Rules]) ->
Params = emqx_json:decode(iolist_to_binary(Conf)),
Action = proplists:get_value(<<"action">>, Params),
Filter = proplists:get_value(<<"topic">>, Params),
parse_rule(Rules, [{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | Acc]).
[{list_to_atom(Rule), binary_to_existing_atom(Action, utf8), Filter} | parse_rule(Rules)].
with_filter(Fun, _, undefined) ->
Fun(), ok;
@ -389,4 +387,3 @@ stringfy(Term) ->
maybe(undefined) -> null;
maybe(Str) -> Str.

View File

@ -258,7 +258,7 @@ on_action_data_to_webserver(Selected, _Envs =
'Pool' := Pool,
'RequestTimeout' := RequestTimeout},
clientid := ClientID}) ->
NBody = format_msg(BodyTokens, Selected),
NBody = format_msg(BodyTokens, clear_user_property_header(Selected)),
NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected),
Req = create_req(Method, NPath, Headers, NBody),
case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of
@ -280,7 +280,12 @@ on_action_data_to_webserver(Selected, _Envs =
format_msg([], Data) ->
emqx_json:encode(Data);
format_msg(Tokens, Data) ->
emqx_rule_utils:proc_tmpl(Tokens, Data).
emqx_rule_utils:proc_tmpl(Tokens, Data).
clear_user_property_header(#{headers := #{properties := #{'User-Property' := _} = P} = H} = S) ->
S#{headers => H#{properties => P#{'User-Property' => []}}};
clear_user_property_header(S) ->
S.
%%------------------------------------------------------------------------------
%% Internal functions
@ -365,9 +370,7 @@ get_ssl_opts(Opts, ResId) ->
test_http_connect(Conf) ->
Url = fun() -> maps:get(<<"url">>, Conf) end,
try
emqx_rule_utils:http_connectivity(Url())
of
try emqx_rule_utils:http_connectivity(Url()) of
ok -> true;
{error, _Reason} ->
?LOG(error, "check http_connectivity failed: ~p", [Url()]),

View File

@ -19,9 +19,8 @@
-compile(export_all).
-compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("emqx_rule_engine/include/rule_engine.hrl").
-define(HOOK_LOOKUP(H), emqx_hooks:lookup(list_to_atom(H))).
-define(ACTION(Name), #{<<"action">> := Name}).
@ -35,7 +34,7 @@ all() ->
, {group, https}
, {group, ipv6http}
, {group, ipv6https}
, {group, all}
, test_rule_webhook
].
groups() ->
@ -44,12 +43,18 @@ groups() ->
, {https, [sequence], Cases}
, {ipv6http, [sequence], Cases}
, {ipv6https, [sequence], Cases}
, {all, [sequence], emqx_ct:all(?MODULE)}
].
start_apps(F) -> emqx_ct_helpers:start_apps(apps(), F).
start_apps() ->
[application:load(App) || App <- apps()],
emqx_ct_helpers:start_apps(apps()).
start_apps(F) ->
[application:load(App) || App <- apps()],
emqx_ct_helpers:start_apps(apps(), F).
init_per_group(rules, Config) -> Config;
init_per_group(Name, Config) ->
net_kernel:start(['test@127.0.0.1', longnames]),
application:ensure_all_started(emqx_management),
set_special_cfgs(),
BasePort =
@ -74,6 +79,20 @@ init_per_group(Name, Config) ->
end,
[{base_port, BasePort}, {transport_opts, Opts} | Config].
init_per_testcase(test_rule_webhook, Config) ->
net_kernel:start(['test@127.0.0.1', longnames]),
ok = ekka_mnesia:start(),
ok = emqx_rule_registry:mnesia(boot),
Handler = fun(_) ->
application:set_env(emqx_web_hook, rules, []),
application:set_env(emqx_web_hook, url, "http://127.0.0.1:9999/"),
application:set_env(emqx_web_hook, ssl, false),
application:set_env(emqx_web_hook, ssloptions, [])
end,
ok = start_apps(Handler),
Config;
init_per_testcase(_, Config) -> Config.
end_per_group(_Name, Config) ->
emqx_ct_helpers:stop_apps(apps()),
Config.
@ -119,6 +138,51 @@ set_special_cfgs() ->
%% Test cases
%%--------------------------------------------------------------------
test_rule_webhook(_) ->
{ok, ServerPid} = http_server:start_link(self(), 9999, []),
receive {ServerPid, ready} -> ok
after 1000 -> error(timeout)
end,
ok = emqx_rule_engine:load_providers(),
{ok, #resource{id = ResId}} = emqx_rule_engine:create_resource(
#{type => web_hook,
config => #{<<"url">> => "http://127.0.0.1:9999/"},
description => <<"For testing">>}),
{ok, #rule{id = Id}} = emqx_rule_engine:create_rule(
#{rawsql => "select * from \"t1\"",
actions => [#{name => 'data_to_webserver',
args => #{<<"$resource">> => ResId}}],
type => web_hook,
description => <<"For testing">>
}),
Properties = #{'User-Property' => [{<<"user_property_key">>, <<"user_property_value">>}]},
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]),
{ok, Client} = emqtt:start_link([ {clientid, ClientId}
, {proto_ver, v5}
, {keepalive, 60}
]),
{ok, _} = emqtt:connect(Client),
{ok, _} = emqtt:publish(Client, <<"t1">>, Properties, <<"Payload...">>, [{qos, 2}]),
Res = receive {http_server, {Any, _Bool}, _Header} -> {ok, Any}
after 100 -> error
end,
?assertMatch({ok, _}, Res),
{ok, Body} = Res,
?assertNotEqual([], binary:matches(Body, <<"User-Property">>)),
?assertNotEqual([], binary:matches(Body, <<"user_property_key">>)),
?assertNotEqual([], binary:matches(Body, <<"user_property_value">>)),
emqtt:stop(Client),
http_server:stop(ServerPid),
emqx_rule_registry:remove_rule(Id),
emqx_rule_registry:remove_resource(ResId),
ok.
test_full_flow(Config) ->
[_|_] = Opts = proplists:get_value(transport_opts, Config),
BasePort = proplists:get_value(base_port, Config),
@ -128,7 +192,7 @@ test_full_flow(Config) ->
after 1000 -> error(timeout)
end,
application:set_env(emqx_web_hook, headers, [{"k1","K1"}, {"k2", "K2"}]),
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:system_time())]),
ClientId = iolist_to_binary(["client-", integer_to_list(erlang:unique_integer([positive]))]),
{ok, C} = emqtt:start_link([ {clientid, ClientId}
, {proto_ver, v5}
, {keepalive, 60}
@ -174,10 +238,10 @@ validate_params_and_headers(ClientState, ClientId) ->
end
after
5000 ->
case ClientState =:= undefined of
true -> error("client_was_never_connected");
false -> error("terminate_action_is_not_received_in_time")
end
case ClientState =:= undefined of
true -> error("client_was_never_connected");
false -> error("terminate_action_is_not_received_in_time")
end
end.
t_check_hooked(_) ->

View File

@ -81,7 +81,7 @@ stop_https() ->
compile_router(Parent) ->
{ok, _} = application:ensure_all_started(cowboy),
cowboy_router:compile([
{'_', [{"/", ?MODULE, #{parent => Parent}}]}
{'_', [{'_', ?MODULE, #{parent => Parent}}]}
]).
init(Req, #{parent := Parent} = State) ->

View File

@ -40,6 +40,8 @@ Parameter | Description | Default Value
`image.pullPolicy` | The image pull policy | `IfNotPresent`
`image.pullSecrets ` | The image pull secrets (does not add image pull secrets to deployed pods) |``[]``
`recreatePods` | Forces the recreation of pods during upgrades, which can be useful to always apply the most recent configuration. | `false`
`podAnnotations ` | Annotations for pod | `{}`
`podManagementPolicy`| To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock | `Parallel`
`persistence.enabled` | Enable EMQX persistence using PVC | `false`
`persistence.storageClass` | Storage class of backing PVC (uses alpha storage class annotation) | `nil`
`persistence.existingClaim` | EMQX data Persistent Volume existing claim name, evaluated as a template | `""`

View File

@ -56,8 +56,11 @@ spec:
version: {{ .Chart.AppVersion }}
app.kubernetes.io/name: {{ include "emqx.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- if .Values.recreatePods }}
annotations:
{{- with .Values.podAnnotations }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- if .Values.recreatePods }}
checksum/config: {{ $configData | sha256sum | quote }}
{{- end }}
spec:

View File

@ -17,6 +17,8 @@ image:
## Forces the recreation of pods during helm upgrades. This can be useful to update configuration values even if the container image did not change.
recreatePods: false
podAnnotations: {}
# Pod deployment policy
# value: OrderedReady | Parallel
# To redeploy a chart with existing PVC(s), the value must be set to Parallel to avoid deadlock

View File

@ -306,6 +306,17 @@ node.crash_dump = {{ platform_log_dir }}/crash.dump
## vm.args: -kernel net_ticktime Number
## node.dist_net_ticktime = 120
## If the host of an Erlang node has many network interfaces,
## this parameter specifies which one to listen on.
## For the type definition of ip_address(), see inet(3).
##
## See: http://www.erlang.org/doc/man/kernel_app.html
##
## Value: IP Address, [0-255].[0-255].[0-255].[0-255]
## Default: 0.0.0.0
## 0.0.0.0 means all network interfaces.
node.dist_use_interface = 0.0.0.0
## Sets the port range for the listener socket of a distributed Erlang node.
## Note that if there are firewalls between clustered nodes, this port segment
## for nodes communication should be allowed.
@ -346,6 +357,17 @@ rpc.async_batch_size = 256
## Defaults to `stateless`.
rpc.port_discovery = stateless
## If the host of an Erlang node has many network interfaces,
## this parameter specifies which one for RPC server to listen on.
## For the type definition of ip_address(), see inet(3).
##
## See: http://www.erlang.org/doc/man/kernel_app.html
##
## Value: IP Address, [0-255].[0-255].[0-255].[0-255]
## Default: 0.0.0.0
## 0.0.0.0 means all network interfaces.
#rpc.tcp_server_ip = 0.0.0.0
## TCP port number for RPC server to listen on.
##
## Only takes effect when `rpc.port_discovery` = `manual`.

View File

@ -350,6 +350,23 @@ end}.
hidden
]}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html#net_ticktime
{mapping, "node.dist_use_interface", "kernel.inet_dist_use_interface", [
{commented, "0.0.0.0"},
{datatype, string}
]}.
{translation, "kernel.inet_dist_use_interface",
fun(Conf) ->
Addr = cuttlefish:conf_get("node.dist_use_interface", Conf, "0.0.0.0"),
case inet:parse_address(Addr) of
{ok, IP} ->
IP;
_ ->
error({bad_node_dist_use_interface, Addr})
end
end}.
%% @doc http://www.erlang.org/doc/man/kernel_app.html
{mapping, "node.dist_listen_min", "kernel.inet_dist_listen_min", [
{commented, 6369},
@ -424,6 +441,23 @@ end}.
{datatype, string}
]}.
%% RPC server, network interface.
{mapping, "rpc.tcp_server_ip", "gen_rpc.socket_ip", [
{default, "0.0.0.0"},
{datatype, string}
]}.
{translation, "gen_rpc.socket_ip",
fun(Conf) ->
Addr = cuttlefish:conf_get("rpc.tcp_server_ip", Conf, "0.0.0.0"),
case inet:parse_address(Addr) of
{ok, IP} ->
IP;
_ ->
error({bad_gen_rpc_socket_ip, Addr})
end
end}.
%% Number of tcp connections when connecting to RPC server
{mapping, "rpc.tcp_client_num", "gen_rpc.tcp_client_num", [
{default, 0},

View File

@ -46,7 +46,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.5"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.8.1.10"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.0"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.7.1"}}}
, {cuttlefish, {git, "https://github.com/emqx/cuttlefish", {tag, "v3.3.6"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "0.3.7"}}}
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
@ -61,6 +61,7 @@
, {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
, {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.13"}}}
, {epgsql, {git, "https://github.com/emqx/epgsql.git", {tag, "4.6.0"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.5"}}}
]}.
{xref_ignores,