Merge remote-tracking branch 'origin/release-50' into 0308-merge-release-50-back-to-master

This commit is contained in:
Zaiming (Stone) Shi 2023-03-08 16:46:45 +01:00
commit fe27604010
63 changed files with 866 additions and 141 deletions

View File

@ -43,9 +43,10 @@ jobs:
- name: Get profiles to build
id: get_profile
env:
INPUTS_PROFILE: ${{ github.event.inputs.profile }}
run: |
cd source
tag=${{ github.ref }}
# tag docker-latest-ce or docker-latest-ee
if git describe --tags --exact --match 'docker-latest-*' 2>/dev/null; then
echo 'is_latest=true due to docker-latest-* tag'
@ -57,38 +58,33 @@ jobs:
echo 'is_latest=false'
is_latest=false
fi
if git describe --tags --match "[v|e]*" --exact; then
# resolve profile
if git describe --tags --match "v*" --exact; then
echo "This is an exact git tag, will publish images"
is_exact='true'
PROFILE=emqx
elif git describe --tags --match "e*" --exact; then
echo "This is an exact git tag, will publish images"
is_exact='true'
PROFILE=emqx-enterprise
else
echo "This is NOT an exact git tag, will not publish images"
is_exact='false'
fi
case $tag in
refs/tags/v*)
PROFILE='emqx'
case "${PROFILE:-$INPUTS_PROFILE}" in
emqx)
EDITION='Opensource'
;;
refs/tags/e*)
PROFILE=emqx-enterprise
emqx-enterprise)
EDITION='Enterprise'
;;
*)
PROFILE=${{ github.event.inputs.profile }}
case "$PROFILE" in
emqx)
EDITION='Opensource'
;;
emqx-enterprise)
EDITION='Enterprise'
;;
*)
echo "ERROR: Failed to resolve build profile"
exit 1
;;
esac
echo "ERROR: Failed to resolve build profile"
exit 1
;;
esac
VSN="$(./pkg-vsn.sh "$PROFILE")"
echo "Building emqx/$PROFILE:$VSN image (latest=$is_latest)"
echo "Push = $is_exact"

View File

@ -8,6 +8,7 @@ on:
push:
branches:
- master
- release-50
pull_request:
# GitHub pull_request action is by default triggered when
# opened reopened or synchronize,

View File

@ -7,7 +7,7 @@ export EMQX_DEFAULT_RUNNER = debian:11-slim
export OTP_VSN ?= $(shell $(CURDIR)/scripts/get-otp-vsn.sh)
export ELIXIR_VSN ?= $(shell $(CURDIR)/scripts/get-elixir-vsn.sh)
export EMQX_DASHBOARD_VERSION ?= v1.1.8
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4-beta.3
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.4
export EMQX_REL_FORM ?= tgz
export QUICER_DOWNLOAD_FROM_RELEASE = 1
ifeq ($(OS),Windows_NT)

View File

@ -32,10 +32,10 @@
%% `apps/emqx/src/bpapi/README.md'
%% Community edition
-define(EMQX_RELEASE_CE, "5.0.19").
-define(EMQX_RELEASE_CE, "5.0.20").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.1-beta.1").
-define(EMQX_RELEASE_EE, "5.0.1-rc.1").
%% the HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -27,7 +27,7 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},

View File

@ -24,7 +24,7 @@ IsQuicSupp = fun() ->
end,
Bcrypt = {bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}},
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}.
Quicer = {quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}.
Dialyzer = fun(Config) ->
{dialyzer, OldDialyzerConfig} = lists:keyfind(dialyzer, 1, Config),

View File

@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.18"},
{vsn, "5.0.19"},
{modules, []},
{registered, []},
{applications, [

View File

@ -1280,7 +1280,18 @@ fields("listener_wss_opts") ->
true
);
fields("listener_quic_ssl_opts") ->
server_ssl_opts_schema(#{}, false);
%% Mark unsupported TLS options deprecated.
lists:map(
fun({Name, Schema}) ->
case is_quic_ssl_opts(Name) of
true ->
{Name, Schema};
false ->
{Name, Schema#{deprecated => {since, "5.0.20"}}}
end
end,
server_ssl_opts_schema(#{}, false)
);
fields("ssl_client_opts") ->
client_ssl_opts_schema(#{});
fields("deflate_opts") ->
@ -2841,3 +2852,18 @@ quic_lowlevel_settings_uint(Low, High, Desc) ->
desc => Desc
}
).
-spec is_quic_ssl_opts(string()) -> boolean().
is_quic_ssl_opts(Name) ->
lists:member(Name, [
"cacertfile",
"certfile",
"keyfile",
"verify"
%% Followings are planned
%% , "password"
%% , "hibernate_after"
%% , "fail_if_no_peer_cert"
%% , "handshake_timeout"
%% , "gc_after_handshake"
]).

View File

@ -537,10 +537,12 @@ ensure_quic_listener(Name, UdpPort, ExtraSettings) ->
mountpoint => <<>>,
zone => default
},
emqx_config:put([listeners, quic, Name], maps:merge(Conf, ExtraSettings)),
case emqx_listeners:start_listener(quic, Name, Conf) of
Conf2 = maps:merge(Conf, ExtraSettings),
emqx_config:put([listeners, quic, Name], Conf2),
case emqx_listeners:start_listener(emqx_listeners:listener_id(quic, Name)) of
ok -> ok;
{error, {already_started, _Pid}} -> ok
{error, {already_started, _Pid}} -> ok;
Other -> throw(Other)
end.
%%

View File

@ -33,7 +33,8 @@ all() ->
{group, mstream},
{group, shutdown},
{group, misc},
t_listener_with_lowlevel_settings
t_listener_with_lowlevel_settings,
t_listener_inval_settings
].
groups() ->
@ -1885,8 +1886,17 @@ t_multi_streams_sub_0_rtt_stream_data_cont(Config) ->
ok = emqtt:disconnect(C),
ok = emqtt:disconnect(C0).
t_listener_inval_settings(_Config) ->
LPort = select_port(),
%% too small
LowLevelTunings = #{stream_recv_buffer_default => 1024},
?assertThrow(
{error, {failed_to_start, _}},
emqx_common_test_helpers:ensure_quic_listener(?FUNCTION_NAME, LPort, LowLevelTunings)
).
t_listener_with_lowlevel_settings(_Config) ->
LPort = 24567,
LPort = select_port(),
LowLevelTunings = #{
max_bytes_per_key => 274877906,
%% In conf schema we use handshake_idle_timeout
@ -1897,7 +1907,7 @@ t_listener_with_lowlevel_settings(_Config) ->
%% tls_client_max_send_buffer,
tls_server_max_send_buffer => 10240,
stream_recv_window_default => 1024,
stream_recv_buffer_default => 1024,
stream_recv_buffer_default => 10240,
conn_flow_control_window => 1024,
max_stateless_operations => 16,
initial_window_packets => 1300,
@ -1936,8 +1946,7 @@ t_listener_with_lowlevel_settings(_Config) ->
{ok, _, [_SubQos]} = emqtt:subscribe_via(C, {new_data_stream, []}, #{}, [
{<<"test/1/3">>, [{qos, 2}]}
]),
ok = emqtt:disconnect(C),
emqx_listeners:stop_listener(emqx_listeners:listener_id(quic, ?FUNCTION_NAME)).
ok = emqtt:disconnect(C).
%%--------------------------------------------------------------------
%% Helper functions

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authz, [
{description, "An OTP application"},
{vsn, "0.1.14"},
{vsn, "0.1.15"},
{registered, []},
{mod, {emqx_authz_app, []}},
{applications, [

View File

@ -558,7 +558,8 @@ schema("/bridges_probe") ->
RequestMeta = #{module => ?MODULE, method => post, path => "/bridges_probe"},
case emqx_dashboard_swagger:filter_check_request_and_translate_body(Request, RequestMeta) of
{ok, #{body := #{<<"type">> := ConnType} = Params}} ->
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params)) of
Params1 = maybe_deobfuscate_bridge_probe(Params),
case emqx_bridge_resource:create_dry_run(ConnType, maps:remove(<<"type">>, Params1)) of
ok ->
{204};
{error, Reason} when not is_tuple(Reason); element(1, Reason) =/= 'exit' ->
@ -568,6 +569,18 @@ schema("/bridges_probe") ->
BadRequest
end.
maybe_deobfuscate_bridge_probe(#{<<"type">> := BridgeType, <<"name">> := BridgeName} = Params) ->
case emqx_bridge:lookup(BridgeType, BridgeName) of
{ok, _} ->
RawConf = emqx:get_raw_config([bridges, BridgeType, BridgeName], #{}),
deobfuscate(Params, RawConf);
_ ->
%% A bridge may be probed before it's created, so not finding it here is fine
Params
end;
maybe_deobfuscate_bridge_probe(Params) ->
Params.
lookup_from_all_nodes(BridgeType, BridgeName, SuccCode) ->
FormatFun = fun format_bridge_info_without_metrics/1,
do_lookup_from_all_nodes(BridgeType, BridgeName, SuccCode, FormatFun).

View File

@ -17,6 +17,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-import(hoconsc, [mk/2, ref/2]).
@ -140,11 +141,7 @@ fields(bridges) ->
#{
desc => ?DESC("bridges_webhook"),
required => false,
converter => fun(X, _HoconOpts) ->
emqx_bridge_compatible_config:upgrade_pre_ee(
X, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
)
end
converter => fun webhook_bridge_converter/2
}
)},
{mqtt,
@ -212,3 +209,48 @@ status() ->
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) ->
Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
),
case Conf1 of
undefined ->
undefined;
_ ->
do_convert_webhook_config(Conf1)
end.
do_convert_webhook_config(
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := #{<<"request_timeout">> := ReqT}} = Conf
) ->
%% ok: same values
Conf;
do_convert_webhook_config(
#{
<<"request_timeout">> := ReqTRootRaw,
<<"resource_opts">> := #{<<"request_timeout">> := ReqTResourceRaw}
} = Conf0
) ->
%% different values; we set them to the same, if they are valid
%% durations
MReqTRoot = emqx_schema:to_duration_ms(ReqTRootRaw),
MReqTResource = emqx_schema:to_duration_ms(ReqTResourceRaw),
case {MReqTRoot, MReqTResource} of
{{ok, ReqTRoot}, {ok, ReqTResource}} ->
{_Parsed, ReqTRaw} = max({ReqTRoot, ReqTRootRaw}, {ReqTResource, ReqTResourceRaw}),
Conf1 = emqx_map_lib:deep_merge(
Conf0,
#{
<<"request_timeout">> => ReqTRaw,
<<"resource_opts">> => #{<<"request_timeout">> => ReqTRaw}
}
),
Conf1;
_ ->
%% invalid values; let the type checker complain about
%% that.
Conf0
end;
do_convert_webhook_config(Conf) ->
Conf.

View File

@ -891,6 +891,35 @@ t_metrics(Config) ->
),
ok.
%% request_timeout in bridge root should match request_timeout in
%% resource_opts.
t_inconsistent_webhook_request_timeouts(Config) ->
Port = ?config(port, Config),
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BadBridgeParams =
emqx_map_lib:deep_merge(
?HTTP_BRIDGE(URL1, ?BRIDGE_TYPE, Name),
#{
<<"request_timeout">> => <<"1s">>,
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
}
),
{ok, 201, RawResponse} = request(
post,
uri(["bridges"]),
BadBridgeParams
),
%% note: same value on both fields
?assertMatch(
#{
<<"request_timeout">> := <<"2s">>,
<<"resource_opts">> := #{<<"request_timeout">> := <<"2s">>}
},
emqx_json:decode(RawResponse, [return_maps])
),
ok.
operation_path(node, Oper, BridgeID) ->
uri(["nodes", node(), "bridges", BridgeID, Oper]);
operation_path(cluster, Oper, BridgeID) ->

View File

@ -28,6 +28,7 @@ empty_config_test() ->
webhook_config_test() ->
Conf1 = parse(webhook_v5011_hocon()),
Conf2 = parse(full_webhook_v5011_hocon()),
Conf3 = parse(full_webhook_v5019_hocon()),
?assertMatch(
#{
@ -59,6 +60,26 @@ webhook_config_test() ->
check(Conf2)
),
%% the converter should pick the greater of the two
%% request_timeouts and place them in the root and inside
%% resource_opts.
?assertMatch(
#{
<<"bridges">> := #{
<<"webhook">> := #{
<<"the_name">> :=
#{
<<"method">> := get,
<<"request_timeout">> := 60_000,
<<"resource_opts">> := #{<<"request_timeout">> := 60_000},
<<"body">> := <<"${payload}">>
}
}
}
},
check(Conf3)
),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
@ -124,7 +145,7 @@ bridges{
max_retries = 3
method = \"get\"
pool_size = 4
request_timeout = \"5s\"
request_timeout = \"15s\"
ssl {enable = false, verify = \"verify_peer\"}
url = \"http://localhost:8080\"
}
@ -164,6 +185,41 @@ full_webhook_v5011_hocon() ->
"}\n"
"".
%% does not contain direction
full_webhook_v5019_hocon() ->
""
"\n"
"bridges{\n"
" webhook {\n"
" the_name{\n"
" body = \"${payload}\"\n"
" connect_timeout = \"5s\"\n"
" enable_pipelining = 100\n"
" headers {\"content-type\" = \"application/json\"}\n"
" max_retries = 3\n"
" method = \"get\"\n"
" pool_size = 4\n"
" pool_type = \"random\"\n"
" request_timeout = \"1m\"\n"
" resource_opts = {\n"
" request_timeout = \"7s\"\n"
" }\n"
" ssl {\n"
" ciphers = \"\"\n"
" depth = 10\n"
" enable = false\n"
" reuse_sessions = true\n"
" secure_renegotiate = true\n"
" user_lookup_fun = \"emqx_tls_psk:lookup\"\n"
" verify = \"verify_peer\"\n"
" versions = [\"tlsv1.3\", \"tlsv1.2\", \"tlsv1.1\", \"tlsv1\"]\n"
" }\n"
" url = \"http://localhost:8080\"\n"
" }\n"
" }\n"
"}\n"
"".
%% erlfmt-ignore
%% this is a generated from v5.0.11
mqtt_v5011_hocon() ->

View File

@ -0,0 +1,267 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_webhook_SUITE).
%% This suite should contains testcases that are specific for the webhook
%% bridge. There are also some test cases that implicitly tests the webhook
%% bridge in emqx_bridge_api_SUITE
-compile(nowarn_export_all).
-compile(export_all).
-import(emqx_mgmt_api_test_util, [request/3, uri/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(_Config) ->
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
snabbkaffe:fix_ct_logging(),
[].
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf, emqx_bridge]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector),
_ = application:stop(emqx_bridge),
ok.
suite() ->
[{timetrap, {seconds, 60}}].
%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) ->
ct:pal("Start server\n"),
process_flag(trap_exit, true),
Parent = self(),
{ok, {Port, Sock}} = listen_on_random_port(),
Acceptor = spawn(fun() ->
accept_loop(Sock, Parent, HTTPServerConfig)
end),
timer:sleep(100),
#{port => Port, sock => Sock, acceptor => Acceptor}.
stop_http_server(#{sock := Sock, acceptor := Acceptor}) ->
ct:pal("Stop server\n"),
exit(Acceptor, kill),
gen_tcp:close(Sock).
listen_on_random_port() ->
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
case gen_tcp:listen(0, SockOpts) of
{ok, Sock} ->
{ok, Port} = inet:port(Sock),
{ok, {Port, Sock}};
{error, Reason} when Reason =/= eaddrinuse ->
{error, Reason}
end.
accept_loop(Sock, Parent, HTTPServerConfig) ->
process_flag(trap_exit, true),
case gen_tcp:accept(Sock) of
{ok, Conn} ->
spawn(fun() -> handle_fun_200_ok(Conn, Parent, HTTPServerConfig, <<>>) end),
%%gen_tcp:controlling_process(Conn, Handler),
accept_loop(Sock, Parent, HTTPServerConfig);
{error, closed} ->
%% socket owner died
ok
end.
make_response(CodeStr, Str) ->
B = iolist_to_binary(Str),
iolist_to_binary(
io_lib:fwrite(
"HTTP/1.0 ~s\r\nContent-Type: text/html\r\nContent-Length: ~p\r\n\r\n~s",
[CodeStr, size(B), B]
)
).
handle_fun_200_ok(Conn, Parent, HTTPServerConfig, Acc) ->
ResponseDelayMS = maps:get(response_delay_ms, HTTPServerConfig, 0),
ct:pal("Waiting for request~n"),
case gen_tcp:recv(Conn, 0) of
{ok, ReqStr} ->
ct:pal("The http handler got request: ~p", [ReqStr]),
case parse_http_request(<<Acc/binary, ReqStr/binary>>) of
{ok, incomplete, NewAcc} ->
handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc);
{ok, Req, NewAcc} ->
timer:sleep(ResponseDelayMS),
Parent ! {http_server, received, Req},
gen_tcp:send(Conn, make_response("200 OK", "Request OK")),
handle_fun_200_ok(Conn, Parent, HTTPServerConfig, NewAcc)
end;
{error, closed} ->
ct:pal("http connection closed");
{error, Reason} ->
ct:pal("the http handler recv error: ~p", [Reason]),
timer:sleep(100),
gen_tcp:close(Conn)
end.
parse_http_request(ReqStr) ->
try
parse_http_request_assertive(ReqStr)
catch
_:_ ->
{ok, incomplete, ReqStr}
end.
parse_http_request_assertive(ReqStr0) ->
%% find body length
[_, LengthStr0] = string:split(ReqStr0, "content-length:"),
[LengthStr, _] = string:split(LengthStr0, "\r\n"),
Length = binary_to_integer(string:trim(LengthStr, both)),
%% split between multiple requests
[Method, ReqStr1] = string:split(ReqStr0, " ", leading),
[Path, ReqStr2] = string:split(ReqStr1, " ", leading),
[_ProtoVsn, ReqStr3] = string:split(ReqStr2, "\r\n", leading),
[_HeaderStr, Rest] = string:split(ReqStr3, "\r\n\r\n", leading),
<<Body:Length/binary, Remain/binary>> = Rest,
{ok, #{method => Method, path => Path, body => Body}, Remain}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Helper functions
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
bridge_async_config(#{port := Port} = Config) ->
Type = maps:get(type, Config, <<"webhook">>),
Name = maps:get(name, Config, atom_to_binary(?MODULE)),
PoolSize = maps:get(pool_size, Config, 1),
QueryMode = maps:get(query_mode, Config, "async"),
ConnectTimeout = maps:get(connect_timeout, Config, 1),
RequestTimeout = maps:get(request_timeout, Config, 10000),
ResourceRequestTimeout = maps:get(resouce_request_timeout, Config, "infinity"),
ConfigString = io_lib:format(
"bridges.~s.~s {\n"
" url = \"http://localhost:~p\"\n"
" connect_timeout = \"~ps\"\n"
" enable = true\n"
" enable_pipelining = 100\n"
" max_retries = 2\n"
" method = \"post\"\n"
" pool_size = ~p\n"
" pool_type = \"random\"\n"
" request_timeout = \"~ps\"\n"
" body = \"${id}\""
" resource_opts {\n"
" async_inflight_window = 100\n"
" auto_restart_interval = \"60s\"\n"
" health_check_interval = \"15s\"\n"
" max_queue_bytes = \"1GB\"\n"
" query_mode = \"~s\"\n"
" request_timeout = \"~s\"\n"
" start_after_created = \"true\"\n"
" start_timeout = \"5s\"\n"
" worker_pool_size = \"1\"\n"
" }\n"
" ssl {\n"
" enable = false\n"
" }\n"
"}\n",
[
Type,
Name,
Port,
ConnectTimeout,
PoolSize,
RequestTimeout,
QueryMode,
ResourceRequestTimeout
]
),
ct:pal(ConfigString),
parse_and_check(ConfigString, Type, Name).
parse_and_check(ConfigString, BridgeType, Name) ->
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
#{<<"bridges">> := #{BridgeType := #{Name := RetConfig}}} = RawConf,
RetConfig.
make_bridge(Config) ->
Type = <<"webhook">>,
Name = atom_to_binary(?MODULE),
BridgeConfig = bridge_async_config(Config#{
name => Name,
type => Type
}),
{ok, _} = emqx_bridge:create(
Type,
Name,
BridgeConfig
),
emqx_bridge_resource:bridge_id(Type, Name).
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
%% This test ensures that https://emqx.atlassian.net/browse/CI-62 is fixed.
%% When the connection time out all the queued requests where dropped in
t_send_async_connection_timeout(_Config) ->
ResponseDelayMS = 90,
#{port := Port} = Server = start_http_server(#{response_delay_ms => 900}),
% Port = 9000,
BridgeID = make_bridge(#{
port => Port,
pool_size => 1,
query_mode => "async",
connect_timeout => ResponseDelayMS * 2,
request_timeout => 10000,
resouce_request_timeout => "infinity"
}),
NumberOfMessagesToSend = 10,
[
emqx_bridge:send_message(BridgeID, #{<<"id">> => Id})
|| Id <- lists:seq(1, NumberOfMessagesToSend)
],
%% Make sure server recive all messages
ct:pal("Sent messages\n"),
MessageIDs = maps:from_keys(lists:seq(1, NumberOfMessagesToSend), void),
receive_request_notifications(MessageIDs, ResponseDelayMS),
stop_http_server(Server),
ok.
receive_request_notifications(MessageIDs, _ResponseDelay) when map_size(MessageIDs) =:= 0 ->
ok;
receive_request_notifications(MessageIDs, ResponseDelay) ->
receive
{http_server, received, Req} ->
RemainingMessageIDs = remove_message_id(MessageIDs, Req),
receive_request_notifications(RemainingMessageIDs, ResponseDelay)
after (30 * 1000) ->
ct:pal("Waited to long time but did not get any message\n"),
ct:fail("All requests did not reach server at least once")
end.
remove_message_id(MessageIDs, #{body := IDBin}) ->
ID = erlang:binary_to_integer(IDBin),
%% It is acceptable to get the same message more than once
maps:without([ID], MessageIDs).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.15"},
{vsn, "0.1.16"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -566,7 +566,9 @@ bin(Atom) when is_atom(Atom) ->
reply_delegator(ReplyFunAndArgs, Result) ->
case Result of
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout ->
%% The normal reason happens when the HTTP connection times out before
%% the request has been fully processed
{error, Reason} when Reason =:= econnrefused; Reason =:= timeout; Reason =:= normal ->
Result1 = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1);
_ ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.12"},
{vsn, "0.1.13"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, grpc, emqx, emqx_authn, emqx_ctl]},

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.2.0"},
{vsn, "0.2.1"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -102,12 +102,12 @@ For bridges only have ingress direction data flow, it can be set to 0 otherwise
request_timeout {
desc {
en: """Timeout for requests. If <code>query_mode</code> is <code>sync</code>, calls to the resource will be blocked for this amount of time before timing out."""
zh: """请求的超时。 如果<code>query_mode</code>是<code>sync</code>,对资源的调用将在超时前被阻断这一时间。"""
en: """Starting from the moment when the request enters the buffer, if the request remains in the buffer for the specified time or is sent but does not receive a response or acknowledgement in time, the request is considered expired."""
zh: """从请求进入缓冲区开始计时,如果请求在规定的时间内仍停留在缓冲区内或者已发送但未能及时收到响应或确认,该请求将被视为过期。"""
}
label {
en: """Request timeout"""
zh: """请求超"""
en: """Request Expiry"""
zh: """请求超"""
}
}
@ -159,12 +159,12 @@ When disabled the messages are buffered in RAM only."""
batch_time {
desc {
en: """Maximum batch waiting interval."""
zh: """最大批量请求等待间。"""
en: """Maximum waiting interval when accumulating a batch at a low message rates for more efficient resource usage."""
zh: """在较低消息率情况下尝试累积批量输出时的最大等待间隔,以提高资源的利用率。"""
}
label {
en: """Batch time"""
zh: """批量等待间隔"""
en: """Max Batch Wait Time"""
zh: """批量等待最大间隔"""
}
}

View File

@ -97,8 +97,8 @@
-define(DEFAULT_BATCH_SIZE, 1).
%% milliseconds
-define(DEFAULT_BATCH_TIME, 20).
-define(DEFAULT_BATCH_TIME_RAW, <<"20ms">>).
-define(DEFAULT_BATCH_TIME, 0).
-define(DEFAULT_BATCH_TIME_RAW, <<"0ms">>).
%% count
-define(DEFAULT_INFLIGHT, 100).

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.8"},
{vsn, "0.1.9"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [

View File

@ -196,13 +196,16 @@ init({Id, Index, Opts}) ->
InflightWinSize = maps:get(async_inflight_window, Opts, ?DEFAULT_INFLIGHT),
InflightTID = inflight_new(InflightWinSize, Id, Index),
HealthCheckInterval = maps:get(health_check_interval, Opts, ?HEALTHCHECK_INTERVAL),
RequestTimeout = maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
BatchTime0 = maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
BatchTime = adjust_batch_time(Id, RequestTimeout, BatchTime0),
Data = #{
id => Id,
index => Index,
inflight_tid => InflightTID,
async_workers => #{},
batch_size => BatchSize,
batch_time => maps:get(batch_time, Opts, ?DEFAULT_BATCH_TIME),
batch_time => BatchTime,
queue => Queue,
resume_interval => maps:get(resume_interval, Opts, HealthCheckInterval),
tref => undefined
@ -1546,6 +1549,12 @@ clear_disk_queue_dir(Id, Index) ->
ensure_flush_timer(Data = #{batch_time := T}) ->
ensure_flush_timer(Data, T).
ensure_flush_timer(Data = #{tref := undefined}, 0) ->
%% if the batch_time is 0, we don't need to start a timer, which
%% can be costly at high rates.
Ref = make_ref(),
self() ! {flush, Ref},
Data#{tref => {Ref, Ref}};
ensure_flush_timer(Data = #{tref := undefined}, T) ->
Ref = make_ref(),
TRef = erlang:send_after(T, self(), {flush, Ref}),
@ -1648,3 +1657,53 @@ do_minimize(?QUERY(_ReplyTo, _Req, _Sent, _ExpireAt) = Query) -> Query.
-else.
do_minimize(?QUERY(ReplyTo, _Req, Sent, ExpireAt)) -> ?QUERY(ReplyTo, [], Sent, ExpireAt).
-endif.
%% To avoid message loss due to misconfigurations, we adjust
%% `batch_time' based on `request_timeout'. If `batch_time' >
%% `request_timeout', all requests will timeout before being sent if
%% the message rate is low. Even worse if `pool_size' is high.
%% We cap `batch_time' at `request_timeout div 2' as a rule of thumb.
adjust_batch_time(_Id, _RequestTimeout = infinity, BatchTime0) ->
BatchTime0;
adjust_batch_time(Id, RequestTimeout, BatchTime0) ->
BatchTime = max(0, min(BatchTime0, RequestTimeout div 2)),
case BatchTime =:= BatchTime0 of
false ->
?SLOG(info, #{
id => Id,
msg => adjusting_buffer_worker_batch_time,
new_batch_time => BatchTime
});
true ->
ok
end,
BatchTime.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
adjust_batch_time_test_() ->
%% just for logging
Id = some_id,
[
{"batch time smaller than request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 500, 100)
)},
{"batch time equal to request_time/2",
?_assertEqual(
100,
adjust_batch_time(Id, 200, 100)
)},
{"batch time greater than request_time/2",
?_assertEqual(
50,
adjust_batch_time(Id, 100, 100)
)},
{"batch time smaller than request_time/2 (request_time = infinity)",
?_assertEqual(
100,
adjust_batch_time(Id, infinity, 100)
)}
].
-endif.

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.9"},
{vsn, "5.0.10"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},

View File

@ -423,7 +423,22 @@ param_path_id() ->
%% Internal functions
%%------------------------------------------------------------------------------
err_msg(Msg) -> emqx_misc:readable_error_msg(Msg).
err_msg({RuleError, {_E, Reason, _S}}) ->
emqx_misc:readable_error_msg(encode_nested_error(RuleError, Reason));
err_msg({Reason, _Details}) ->
emqx_misc:readable_error_msg(Reason);
err_msg(Msg) ->
emqx_misc:readable_error_msg(Msg).
encode_nested_error(RuleError, Reason) when is_tuple(Reason) ->
encode_nested_error(RuleError, element(1, Reason));
encode_nested_error(RuleError, Reason) ->
case emqx_json:safe_encode([{RuleError, Reason}]) of
{ok, Json} ->
Json;
_ ->
{RuleError, Reason}
end.
format_rule_resp(Rules) when is_list(Rules) ->
[format_rule_resp(R) || R <- Rules];

View File

@ -40,6 +40,9 @@ end_per_suite(_Config) ->
init_per_testcase(_, Config) ->
Config.
end_per_testcase(t_crud_rule_api, Config) ->
meck:unload(emqx_json),
end_per_testcase(common, Config);
end_per_testcase(_, _Config) ->
{200, #{data := Rules}} =
emqx_rule_engine_api:'/rules'(get, #{query_string => #{}}),
@ -119,12 +122,54 @@ t_crud_rule_api(_Config) ->
emqx_rule_engine_api:'/rules/:id'(get, #{bindings => #{id => RuleID}})
),
{400, #{
code := 'BAD_REQUEST',
message := SelectAndTransformJsonError
}} =
emqx_rule_engine_api:'/rule_test'(
post,
test_rule_params(<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hel">>)
),
?assertMatch(
#{<<"select_and_transform_error">> := <<"decode_json_failed">>},
emqx_json:decode(SelectAndTransformJsonError, [return_maps])
),
{400, #{
code := 'BAD_REQUEST',
message := SelectAndTransformBadArgError
}} =
emqx_rule_engine_api:'/rule_test'(
post,
test_rule_params(
<<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
)
),
?assertMatch(
#{<<"select_and_transform_error">> := <<"badarg">>},
emqx_json:decode(SelectAndTransformBadArgError, [return_maps])
),
{400, #{
code := 'BAD_REQUEST',
message := BadSqlMessage
}} = emqx_rule_engine_api:'/rule_test'(
post,
test_rule_params(
<<"BAD_SQL">>, <<"{\"msg\": \"hello\"}">>
)
),
?assertMatch({match, _}, re:run(BadSqlMessage, "syntax error")),
meck:expect(emqx_json, safe_encode, 1, {error, foo}),
?assertMatch(
{400, #{
code := 'BAD_REQUEST',
message := <<"{select_and_transform_error,{error,{decode_json_failed,", _/binary>>
message := <<"{select_and_transform_error,badarg}">>
}},
emqx_rule_engine_api:'/rule_test'(post, test_rule_params())
emqx_rule_engine_api:'/rule_test'(
post,
test_rule_params(
<<"SELECT\n payload.msg > 1\nFROM\n \"t/#\"">>, <<"{\"msg\": \"hello\"}">>
)
)
),
ok.
@ -221,19 +266,18 @@ t_reset_metrics_on_disable(_Config) ->
?assertMatch(#{passed := 0, matched := 0}, Metrics1),
ok.
test_rule_params() ->
test_rule_params(Sql, Payload) ->
#{
body => #{
<<"context">> =>
#{
<<"clientid">> => <<"c_emqx">>,
<<"event_type">> => <<"message_publish">>,
<<"payload">> => <<"{\"msg\": \"hel">>,
<<"payload">> => Payload,
<<"qos">> => 1,
<<"topic">> => <<"t/a">>,
<<"username">> => <<"u_emqx">>
},
<<"sql">> =>
<<"SELECT\n payload.msg\nFROM\n \"t/#\"">>
<<"sql">> => Sql
}
}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_statsd, [
{description, "EMQX Statsd"},
{vsn, "5.0.5"},
{vsn, "5.0.6"},
{registered, []},
{mod, {emqx_statsd_app, []}},
{applications, [

View File

@ -38,7 +38,7 @@
]).
%% Interface
-export([start_link/0]).
-export([start_link/1]).
%% Internal Exports
-export([
@ -68,17 +68,18 @@ do_restart() ->
ok = do_start(),
ok.
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
init([]) ->
init(Conf) ->
process_flag(trap_exit, true),
#{
tags := TagsRaw,
server := Server,
sample_time_interval := SampleTimeInterval,
flush_time_interval := FlushTimeInterval
} = emqx_conf:get([statsd]),
} = Conf,
FlushTimeInterval1 = flush_interval(FlushTimeInterval, SampleTimeInterval),
{Host, Port} = emqx_schema:parse_server(Server, ?SERVER_PARSE_OPTS),
Tags = maps:fold(fun(K, V, Acc) -> [{to_bin(K), to_bin(V)} | Acc] end, [], TagsRaw),
Opts = [{tags, Tags}, {host, Host}, {port, Port}, {prefix, <<"emqx">>}],
@ -86,7 +87,7 @@ init([]) ->
{ok,
ensure_timer(#{
sample_time_interval => SampleTimeInterval,
flush_time_interval => FlushTimeInterval,
flush_time_interval => FlushTimeInterval1,
estatsd_pid => Pid
})}.
@ -129,6 +130,19 @@ terminate(_Reason, #{estatsd_pid := Pid}) ->
%% Internal function
%%------------------------------------------------------------------------------
flush_interval(FlushInterval, SampleInterval) when FlushInterval >= SampleInterval ->
FlushInterval;
flush_interval(_FlushInterval, SampleInterval) ->
?SLOG(
warning,
#{
msg =>
"Configured flush_time_interval is lower than sample_time_interval, "
"setting: flush_time_interval = sample_time_interval."
}
),
SampleInterval.
ensure_timer(State = #{sample_time_interval := SampleTimeInterval}) ->
State#{timer => emqx_misc:start_timer(SampleTimeInterval, ?SAMPLE_TIMEOUT)}.

View File

@ -45,9 +45,9 @@ remove_handler() ->
ok = emqx_config_handler:remove_handler(?STATSD),
ok.
post_config_update(?STATSD, _Req, #{enable := true}, _Old, _AppEnvs) ->
post_config_update(?STATSD, _Req, #{enable := true} = New, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP),
emqx_statsd_sup:ensure_child_started(?APP);
emqx_statsd_sup:ensure_child_started(?APP, New);
post_config_update(?STATSD, _Req, #{enable := false}, _Old, _AppEnvs) ->
emqx_statsd_sup:ensure_child_stopped(?APP);
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->

View File

@ -25,6 +25,7 @@
-export([
start_link/0,
ensure_child_started/1,
ensure_child_started/2,
ensure_child_stopped/1
]).
@ -45,7 +46,11 @@ start_link() ->
-spec ensure_child_started(atom()) -> ok.
ensure_child_started(Mod) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, []))).
ensure_child_started(Mod, emqx_conf:get([statsd], #{})).
-spec ensure_child_started(atom(), map()) -> ok.
ensure_child_started(Mod, Conf) when is_atom(Mod) ->
assert_started(supervisor:start_child(?MODULE, ?CHILD(Mod, [Conf]))).
%% @doc Stop the child worker process.
-spec ensure_child_stopped(any()) -> ok.
@ -61,9 +66,9 @@ ensure_child_stopped(ChildId) ->
init([]) ->
Children =
case emqx_conf:get([statsd, enable], false) of
true -> [?CHILD(emqx_statsd, [])];
false -> []
case emqx_conf:get([statsd], #{}) of
#{enable := true} = Conf -> [?CHILD(emqx_statsd, [Conf])];
_ -> []
end,
{ok, {{one_for_one, 100, 3600}, Children}}.

View File

@ -113,6 +113,32 @@ t_kill_exit(_) ->
?assertNotEqual(Estatsd, Estatsd1),
ok.
t_config_update(_) ->
OldRawConf = emqx_conf:get_raw([statsd]),
{ok, _} = emqx_statsd_config:update(OldRawConf#{<<"enable">> => true}),
CommonKeys = [flush_time_interval, sample_time_interval],
OldConf = emqx_conf:get([statsd]),
OldStatsDState = sys:get_state(emqx_statsd),
OldPid = erlang:whereis(emqx_statsd),
?assertEqual(maps:with(CommonKeys, OldConf), maps:with(CommonKeys, OldStatsDState)),
NewRawConfExpect = OldRawConf#{
<<"flush_time_interval">> := <<"42s">>,
<<"sample_time_interval">> := <<"42s">>
},
try
{ok, _} = emqx_statsd_config:update(NewRawConfExpect),
NewRawConf = emqx_conf:get_raw([statsd]),
NewConf = emqx_conf:get([statsd]),
NewStatsDState = sys:get_state(emqx_statsd),
NewPid = erlang:whereis(emqx_statsd),
?assertNotEqual(OldRawConf, NewRawConf),
?assertEqual(NewRawConfExpect, NewRawConf),
?assertEqual(maps:with(CommonKeys, NewConf), maps:with(CommonKeys, NewStatsDState)),
?assertNotEqual(OldPid, NewPid)
after
{ok, _} = emqx_statsd_config:update(OldRawConf)
end.
request(Method) -> request(Method, []).
request(Method, Body) ->

View File

@ -0,0 +1 @@
Errors returned by rule engine API are formatted in a more human readable way rather than dumping the raw error including the stacktrace.

View File

@ -0,0 +1 @@
规则引擎 API 返回用户可读的错误信息而不是原始的栈追踪信息。

View File

@ -0,0 +1,7 @@
Deprecate unused QUIC TLS options.
Only following TLS options are kept for the QUIC listeners:
- cacertfile
- certfile
- keyfile
- verify

View File

@ -0,0 +1,8 @@
废弃未使用的 QUIC TLS 选项。
QUIC 监听器只保留以下 TLS 选项:
- cacertfile
- certfile
- keyfile
- verify

View File

@ -0,0 +1,2 @@
Fix webhook bridge error handling: connection timeout should be a retriable error.
Prior to this fix, connection timeout was classified as unrecoverable error and led to request being dropped.

View File

@ -0,0 +1,2 @@
修复 HTTP 桥接的一个异常处理:连接超时错误发生后,发生错误的请求可以被重试。
在此修复前,连接超时后,被当作不可重试类型的错误处理,导致请求被丢弃。

View File

@ -0,0 +1,2 @@
Fix an issue that invalid QUIC listener setting could casue segfault.

View File

@ -0,0 +1,2 @@
修复了无效的 QUIC 监听器设置可能导致 segfault 的问题。

View File

@ -0,0 +1,3 @@
Fix problem when joining core nodes running different EMQX versions into a cluster.
[Mria PR](https://github.com/emqx/mria/pull/127)

View File

@ -0,0 +1,3 @@
修正将运行不同 EMQX 版本的核心节点加入集群的问题。
[Mria PR](https://github.com/emqx/mria/pull/127)

View File

@ -0,0 +1,4 @@
Upgrade HTTP client ehttpc to `0.4.7`.
Prior to this upgrade, HTTP clients for authentication, authorization and webhook may crash
if `body` is empty but content-type HTTP header is set.
For more details see [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44).

View File

@ -0,0 +1,3 @@
HTTP 客户端库 `ehttpc` 升级到 0.4.7。
在升级前,如果 HTTP 客户端,例如 认证授权webhook 等配置中使用了content-type HTTP 头,但是没有配置 body则可能会发生异常。
详情见 [ehttpc PR#44](https://github.com/emqx/ehttpc/pull/44)。

View File

@ -0,0 +1,2 @@
Use default template `${timestamp}` if the `timestamp` config is empty (undefined) when inserting data in InfluxDB.
Prior to this change, InfluxDB bridge inserted a wrong timestamp when template is not provided.

View File

@ -0,0 +1,2 @@
在 InfluxDB 中插入数据时,如果时间戳为空(未定义),则使用默认的占位符 `${timestamp}`
在此修复前如果时间戳字段没有设置InfluxDB 桥接使用了一个错误的时间戳。

View File

@ -14,8 +14,8 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
version: 5.0.19
version: 5.0.20
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application.
appVersion: 5.0.19
appVersion: 5.0.20

View File

@ -77,8 +77,8 @@ emqx_ee_bridge_gcp_pubsub {
request_timeout {
desc {
en: "HTTP request timeout."
zh: "HTTP 请求超时。"
en: "Deprecated: Configure the request timeout in the buffer settings."
zh: "废弃的。在缓冲区设置中配置请求超时。"
}
label: {
en: "Request Timeout"

View File

@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,

View File

@ -84,6 +84,7 @@ fields(bridge_config) ->
emqx_schema:duration_ms(),
#{
required => false,
deprecated => {since, "e5.0.1"},
default => <<"15s">>,
desc => ?DESC("request_timeout")
}

View File

@ -282,7 +282,6 @@ gcp_pubsub_config(Config) ->
"bridges.gcp_pubsub.~s {\n"
" enable = true\n"
" connect_timeout = 1s\n"
" request_timeout = 1s\n"
" service_account_json = ~s\n"
" payload_template = ~p\n"
" pubsub_topic = ~s\n"

View File

@ -527,7 +527,8 @@ t_start_ok(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
?check_trace(
begin
@ -685,7 +686,8 @@ t_const_timestamp(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload
<<"payload">> => Payload,
<<"timestamp">> => erlang:system_time(millisecond)
},
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
@ -740,7 +742,7 @@ t_boolean_variants(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?assertEqual(ok, send_message(Config, SentData)),
@ -805,7 +807,7 @@ t_bad_timestamp(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?check_trace(
@ -949,7 +951,7 @@ t_write_failure(Config) ->
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"timestamp">> => erlang:system_time(nanosecond),
<<"timestamp">> => erlang:system_time(millisecond),
<<"payload">> => Payload
},
?check_trace(

View File

@ -540,7 +540,9 @@ resource_configs() ->
<<"query_mode">> => <<"sync">>,
<<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>
<<"start_timeout">> => <<"15s">>,
<<"batch_time">> => <<"4s">>,
<<"request_timeout">> => <<"30s">>
}
}.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_connector, [
{description, "EMQX Enterprise connectors"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,

View File

@ -33,7 +33,7 @@
connect_timeout := emqx_schema:duration_ms(),
max_retries := non_neg_integer(),
pubsub_topic := binary(),
request_timeout := emqx_schema:duration_ms(),
resource_opts := #{request_timeout := emqx_schema:duration_ms(), any() => term()},
service_account_json := service_account_json(),
any() => term()
}.
@ -71,7 +71,7 @@ on_start(
payload_template := PayloadTemplate,
pool_size := PoolSize,
pubsub_topic := PubSubTopic,
request_timeout := RequestTimeout
resource_opts := #{request_timeout := RequestTimeout}
} = Config
) ->
?SLOG(info, #{

View File

@ -35,11 +35,15 @@
desc/1
]).
-type ts_precision() :: ns | us | ms | s.
%% influxdb servers don't need parse
-define(INFLUXDB_HOST_OPTIONS, #{
default_port => ?INFLUXDB_DEFAULT_PORT
}).
-define(DEFAULT_TIMESTAMP_TMPL, "${timestamp}").
%% -------------------------------------------------------------------------------------------------
%% resource callback
callback_mode() -> async_if_possible.
@ -232,15 +236,14 @@ do_start_client(
ClientConfig,
Config = #{write_syntax := Lines}
) ->
Precision = maps:get(precision, Config, ms),
case influxdb:start_client(ClientConfig) of
{ok, Client} ->
case influxdb:is_alive(Client, true) of
true ->
State = #{
client => Client,
write_syntax => to_config(
Lines, proplists:get_value(precision, ClientConfig)
)
write_syntax => to_config(Lines, Precision)
},
?SLOG(info, #{
msg => "starting influxdb connector success",
@ -409,27 +412,36 @@ to_config(Lines, Precision) ->
to_config([], Acc, _Precision) ->
lists:reverse(Acc);
to_config([Item0 | Rest], Acc, Precision) ->
Ts = maps:get(timestamp, Item0, undefined),
Ts0 = maps:get(timestamp, Item0, undefined),
{Ts, FromPrecision, ToPrecision} = preproc_tmpl_timestamp(Ts0, Precision),
Item = #{
measurement => emqx_plugin_libs_rule:preproc_tmpl(maps:get(measurement, Item0)),
timestamp => preproc_tmpl_timestamp(Ts, Precision),
timestamp => Ts,
precision => {FromPrecision, ToPrecision},
tags => to_kv_config(maps:get(tags, Item0)),
fields => to_kv_config(maps:get(fields, Item0))
},
to_config(Rest, [Item | Acc], Precision).
preproc_tmpl_timestamp(undefined, <<"ns">>) ->
erlang:system_time(nanosecond);
preproc_tmpl_timestamp(undefined, <<"us">>) ->
erlang:system_time(microsecond);
preproc_tmpl_timestamp(undefined, <<"ms">>) ->
erlang:system_time(millisecond);
preproc_tmpl_timestamp(undefined, <<"s">>) ->
erlang:system_time(second);
preproc_tmpl_timestamp(Ts, _) when is_integer(Ts) ->
Ts;
preproc_tmpl_timestamp(Ts, _) when is_binary(Ts); is_list(Ts) ->
emqx_plugin_libs_rule:preproc_tmpl(Ts).
%% pre-process the timestamp template
%% returns a tuple of three elements:
%% 1. The timestamp template itself.
%% 2. The source timestamp precision (ms if the template ${timestamp} is used).
%% 3. The target timestamp precision (configured for the client).
preproc_tmpl_timestamp(undefined, Precision) ->
%% not configured, we default it to the message timestamp
preproc_tmpl_timestamp(?DEFAULT_TIMESTAMP_TMPL, Precision);
preproc_tmpl_timestamp(Ts, Precision) when is_integer(Ts) ->
%% a const value is used which is very much unusual, but we have to add a special handling
{Ts, Precision, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_list(Ts) ->
preproc_tmpl_timestamp(iolist_to_binary(Ts), Precision);
preproc_tmpl_timestamp(<<?DEFAULT_TIMESTAMP_TMPL>> = Ts, Precision) ->
{emqx_plugin_libs_rule:preproc_tmpl(Ts), ms, Precision};
preproc_tmpl_timestamp(Ts, Precision) when is_binary(Ts) ->
%% a placehold is in use. e.g. ${payload.my_timestamp}
%% we can only hope it the value will be of the same precision in the configs
{emqx_plugin_libs_rule:preproc_tmpl(Ts), Precision, Precision}.
to_kv_config(KVfields) ->
maps:fold(fun to_maps_config/3, #{}, proplists:to_map(KVfields)).
@ -472,7 +484,8 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
fields := [{binary(), binary()}],
measurement := binary(),
tags := [{binary(), binary()}],
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer()
timestamp := emqx_plugin_libs_rule:tmpl_token() | integer(),
precision := {From :: ts_precision(), To :: ts_precision()}
}
]) -> {ok, [map()]} | {error, term()}.
data_to_points(Data, SyntaxLines) ->
@ -531,16 +544,27 @@ line_to_point(
#{
measurement := Measurement,
tags := Tags,
fields := Fields
fields := Fields,
timestamp := Ts,
precision := Precision
} = Item
) ->
{_, EncodedTags} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Tags),
{_, EncodedFields} = maps:fold(fun maps_config_to_data/3, {Data, #{}}, Fields),
Item#{
maps:without([precision], Item#{
measurement => emqx_plugin_libs_rule:proc_tmpl(Measurement, Data),
tags => EncodedTags,
fields => EncodedFields
}.
fields => EncodedFields,
timestamp => maybe_convert_time_unit(Ts, Precision)
}).
maybe_convert_time_unit(Ts, {FromPrecision, ToPrecision}) ->
erlang:convert_time_unit(Ts, time_unit(FromPrecision), time_unit(ToPrecision)).
time_unit(s) -> second;
time_unit(ms) -> millisecond;
time_unit(us) -> microsecond;
time_unit(ns) -> nanosecond.
maps_config_to_data(K, V, {Data, Res}) ->
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},

View File

@ -227,5 +227,6 @@ test_query() ->
{send_message, #{
<<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>
<<"topic">> => <<"connector_test">>,
<<"timestamp">> => 1678220316257
}}.

View File

@ -48,13 +48,13 @@ defmodule EMQXUmbrella.MixProject do
{:redbug, "2.0.8"},
{:covertool, github: "zmstone/covertool", tag: "2.0.4.1", override: true},
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.6", override: true},
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.7", override: true},
{:gproc, github: "uwiger/gproc", tag: "0.8.0", override: true},
{:jiffy, github: "emqx/jiffy", tag: "1.0.5", override: true},
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true},
{:ekka, github: "emqx/ekka", tag: "0.14.2", override: true},
{:ekka, github: "emqx/ekka", tag: "0.14.3", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.8", override: true},
@ -649,7 +649,7 @@ defmodule EMQXUmbrella.MixProject do
defp quicer_dep() do
if enable_quicer?(),
# in conflict with emqx and emqtt
do: [{:quicer, github: "emqx/quic", tag: "0.0.111", override: true}],
do: [{:quicer, github: "emqx/quic", tag: "0.0.113", override: true}],
else: []
end

View File

@ -50,13 +50,13 @@
, {gpb, "4.19.5"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
, {gun, {git, "https://github.com/emqx/gun", {tag, "1.3.9"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.6"}}}
, {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.4.7"}}}
, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}
, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.3"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}}

View File

@ -39,7 +39,7 @@ bcrypt() ->
{bcrypt, {git, "https://github.com/emqx/erlang-bcrypt.git", {tag, "0.6.0"}}}.
quicer() ->
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.111"}}}.
{quicer, {git, "https://github.com/emqx/quic.git", {tag, "0.0.113"}}}.
jq() ->
{jq, {git, "https://github.com/emqx/jq", {tag, "v0.3.9"}}}.

View File

@ -1,6 +1,6 @@
#!/usr/bin/env bash
set -euo pipefail
exit 0
latest_release=$(git describe --abbrev=0 --tags --exclude '*rc*' --exclude '*alpha*' --exclude '*beta*' --exclude '*docker*')
echo "Compare base: $latest_release"

View File

@ -19,15 +19,23 @@ RELEASE_GIT_TAG is a 'v*' or 'e*' tag for example:
e5.0.0-beta.6
options:
-h|--help: Print this usage.
-b|--base: Specify the current release base branch, can be one of
release-50
NOTE: this option should be used when --dryrun.
--dryrun: Do not actually create the git tag.
--skip-appup: Skip checking appup
Useful when you are sure that appup is already updated'
--prev-tag: Provide the prev tag to automatically generate changelogs
If this option is absent, the tag found by git describe will be used
-h|--help: Print this usage.
-b|--base: Specify the current release base branch, can be one of
release-50
NOTE: this option should be used when --dryrun.
--dryrun: Do not actually create the git tag.
--skip-appup: Skip checking appup
Useful when you are sure that appup is already updated'
--prev-tag <tag>: Provide the prev tag to automatically generate changelogs
If this option is absent, the tag found by git describe will be used
--docker-latest: Set this option to assign :latest tag on the corresponding docker image
in addition to regular :<version> one
NOTE: For 5.0 series the current working branch must be 'release-50' for opensource edition
and 'release-e50' for enterprise edition.
@ -40,23 +48,31 @@ EOF
logerr() {
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logwarn() {
echo "$(tput setaf 3)WARNING: $1$(tput sgr0)"
}
logmsg() {
echo "INFO: $1"
}
TAG="${1:-}"
DOCKER_LATEST_TAG=
case "$TAG" in
v*)
TAG_PREFIX='v'
PROFILE='emqx'
SKIP_APPUP='yes'
DOCKER_LATEST_TAG='docker-latest-ce'
;;
e*)
TAG_PREFIX='e'
PROFILE='emqx-enterprise'
#TODO change to no when we are ready to support hot-upgrade
SKIP_APPUP='yes'
DOCKER_LATEST_TAG='docker-latest-ee'
;;
-h|--help)
usage
@ -72,6 +88,7 @@ esac
shift 1
DRYRUN='no'
DOCKER_LATEST='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
@ -99,6 +116,10 @@ while [ "$#" -gt 0 ]; do
PREV_TAG="$1"
shift
;;
--docker-latest)
DOCKER_LATEST='yes'
shift
;;
*)
logerr "Unknown option $1"
exit 1
@ -180,11 +201,11 @@ assert_release_version() {
assert_release_version "$TAG"
## Check if all upstream branches are merged
if [ -z "${BASE_BR:-}" ]; then
./scripts/rel/sync-remotes.sh
else
./scripts/rel/sync-remotes.sh --base "$BASE_BR"
fi
SYNC_REMOTES_ARGS=
[ -n "${BASE_BR:-}" ] && SYNC_REMOTES_ARGS="--base $BASE_BR $SYNC_REMOTES_ARGS"
[ "$DRYRUN" = 'yes' ] && SYNC_REMOTES_ARGS="--dryrun $SYNC_REMOTES_ARGS"
# shellcheck disable=SC2086
./scripts/rel/sync-remotes.sh $SYNC_REMOTES_ARGS
## Check if the Chart versions are in sync
./scripts/rel/check-chart-vsn.sh "$PROFILE"
@ -231,6 +252,9 @@ generate_changelog () {
if [ "$DRYRUN" = 'yes' ]; then
logmsg "Release tag is ready to be created with command: git tag $TAG"
if [ "$DOCKER_LATEST" = 'yes' ]; then
logmsg "Docker latest tag is ready to be created with command: git tag --force $DOCKER_LATEST_TAG"
fi
else
case "$TAG" in
*rc*)
@ -252,4 +276,14 @@ else
esac
git tag "$TAG"
logmsg "$TAG is created OK."
if [ "$DOCKER_LATEST" = 'yes' ]; then
git tag --force "$DOCKER_LATEST_TAG"
logmsg "$DOCKER_LATEST_TAG is created OK."
fi
logwarn "Don't forget to push the tags!"
if [ "$DOCKER_LATEST" = 'yes' ]; then
echo "git push --atomic --force origin $TAG $DOCKER_LATEST_TAG"
else
echo "git push origin $TAG"
fi
fi

View File

@ -33,12 +33,17 @@ options:
Without this option, the script executes 'git merge' command
with '--ff-only' option which conveniently pulls remote
updates if there is any, and fails when fast-forward is not possible
--dryrun:
Do not perform merge. Run the checks, fetch from remote,
and show what's going to happen.
EOF
}
logerr() {
echo "$(tput setaf 1)ERROR: $1$(tput sgr0)"
}
logwarn() {
echo "$(tput setaf 3)WARNING: $1$(tput sgr0)"
}
@ -48,6 +53,7 @@ logmsg() {
}
INTERACTIVE='no'
DRYRUN='no'
while [ "$#" -gt 0 ]; do
case $1 in
-h|--help)
@ -63,6 +69,10 @@ while [ "$#" -gt 0 ]; do
BASE_BRANCH="$1"
shift
;;
--dryrun)
shift
DRYRUN='yes'
;;
*)
logerr "Unknown option $1"
exit 1
@ -151,6 +161,10 @@ upstream_branches() {
}
for remote_ref in $(upstream_branches "$BASE_BRANCH"); do
logmsg "Merging $remote_ref"
git merge $MERGE_OPTS "$remote_ref"
if [ "$DRYRUN" = 'yes' ]; then
logmsg "Merge with this command: git merge $MERGE_OPTS $remote_ref"
else
logmsg "Merging $remote_ref"
git merge $MERGE_OPTS "$remote_ref"
fi
done