Merge pull request #10046 from zmstone/0301-merge-release-50-to-master

0301 merge release 50 to master
This commit is contained in:
Zaiming (Stone) Shi 2023-03-01 10:58:15 +01:00 committed by GitHub
commit d0f43bead3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 272 additions and 117 deletions

View File

@ -222,6 +222,10 @@ run: $(PROFILE) quickrun
quickrun:
./_build/$(PROFILE)/rel/emqx/bin/emqx console
## Take the currently set PROFILE
docker:
@$(BUILD) $(PROFILE) docker
## docker target is to create docker instructions
.PHONY: $(REL_PROFILES:%=%-docker) $(REL_PROFILES:%=%-elixir-docker)
define gen-docker-target

View File

@ -27,9 +27,9 @@
{jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}},
{cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}},
{esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.1"}}},
{ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}},
{gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}},
{hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}},
{pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}},
{recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}},
{snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}

View File

@ -872,12 +872,6 @@ fields("mqtt_quic_listener") ->
?MAX_UINT(64),
?DESC(fields_mqtt_quic_listener_max_bytes_per_key)
)},
{"handshake_idle_timeout_ms",
quic_lowlevel_settings_uint(
1,
?MAX_UINT(64),
?DESC(fields_mqtt_quic_listener_handshake_idle_timeout)
)},
{"tls_server_max_send_buffer",
quic_lowlevel_settings_uint(
1,

View File

@ -55,11 +55,12 @@ t_register_sub(_) ->
?assertEqual(self(), emqx_broker_helper:lookup_subpid(<<"clientid">>)).
t_shard_seq(_) ->
?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)),
emqx_broker_helper:create_seq(<<"topic">>),
?assertEqual([{<<"topic">>, 1}], ets:lookup(emqx_subseq, <<"topic">>)),
emqx_broker_helper:reclaim_seq(<<"topic">>),
?assertEqual([], ets:lookup(emqx_subseq, <<"topic">>)).
TestTopic = atom_to_list(?FUNCTION_NAME),
?assertEqual([], ets:lookup(emqx_subseq, TestTopic)),
emqx_broker_helper:create_seq(TestTopic),
?assertEqual([{TestTopic, 1}], ets:lookup(emqx_subseq, TestTopic)),
emqx_broker_helper:reclaim_seq(TestTopic),
?assertEqual([], ets:lookup(emqx_subseq, TestTopic)).
t_shards_num(_) ->
?assertEqual(emqx_vm:schedulers() * 32, emqx_broker_helper:shards_num()).

View File

@ -22,18 +22,23 @@
-include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("lc/include/lc.hrl").
all() -> emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
emqx_common_test_helpers:start_apps([]),
Config.
OldSch = erlang:system_flag(schedulers_online, 1),
[{old_sch, OldSch} | Config].
end_per_suite(_Config) ->
end_per_suite(Config) ->
erlang:system_flag(schedulers_online, ?config(old_sch, Config)),
emqx_common_test_helpers:stop_apps([]).
init_per_testcase(_, Config) ->
emqx_common_test_helpers:boot_modules(all),
emqx_common_test_helpers:start_apps([]),
emqx_olp:enable(),
case wait_for(fun() -> lc_sup:whereis_runq_flagman() end, 10) of
true -> ok;
@ -86,6 +91,7 @@ t_overload_cooldown_conn(Config) ->
t_overloaded_conn(Config),
timer:sleep(1000),
?assert(not emqx_olp:is_overloaded()),
true = emqx:is_running(node()),
{ok, C} = emqtt:start_link([{host, "localhost"}, {clientid, "myclient"}]),
?assertMatch({ok, _Pid}, emqtt:connect(C)),
emqtt:stop(C).
@ -93,7 +99,7 @@ t_overload_cooldown_conn(Config) ->
-spec burst_runq() -> ParentToKill :: pid().
burst_runq() ->
NProc = erlang:system_info(schedulers_online),
spawn(?MODULE, worker_parent, [NProc * 10, {?MODULE, busy_loop, []}]).
spawn(?MODULE, worker_parent, [NProc * 1000, {?MODULE, busy_loop, []}]).
%% internal helpers
worker_parent(N, {M, F, A}) ->

View File

@ -541,7 +541,6 @@ t_multi_streams_packet_boundary(Config) ->
[{qos, PubQos}],
undefined
),
timer:sleep(300),
PubRecvs = recv_pub(3, [], 1000),
?assertMatch(
[
@ -668,12 +667,10 @@ t_multi_streams_packet_malform(Config) ->
{error, stm_send_error, aborted} -> ok
end,
timer:sleep(200),
?assert(is_list(emqtt:info(C))),
{error, stm_send_error, aborted} = quicer:send(MalformStream, <<1, 2, 3, 4, 5, 6, 7, 8, 9, 0>>),
timer:sleep(200),
?assert(is_list(emqtt:info(C))),
ok = emqtt:disconnect(C).
@ -743,7 +740,6 @@ t_multi_streams_packet_too_large(Config) ->
[{qos, PubQos}],
undefined
),
timer:sleep(200),
?assert(is_list(emqtt:info(C))),
timeout = recv_pub(1),
@ -757,7 +753,6 @@ t_multi_streams_packet_too_large(Config) ->
[{qos, PubQos}],
undefined
),
timer:sleep(200),
timeout = recv_pub(1),
?assert(is_list(emqtt:info(C))),
@ -783,7 +778,6 @@ t_multi_streams_packet_too_large(Config) ->
topic := Topic
}}
] = recv_pub(1),
timer:sleep(200),
?assert(is_list(emqtt:info(C))),
@ -1409,7 +1403,7 @@ t_multi_streams_shutdown_ctrl_stream_then_reconnect(Config) ->
{quic, _Conn, Ctrlstream} = proplists:get_value(socket, emqtt:info(C)),
quicer:shutdown_stream(Ctrlstream, ?config(stream_shutdown_flag, Config), 500, 100),
timer:sleep(200),
%% Client should be closed
%% Client should not be closed
?assert(is_list(emqtt:info(C))).
t_multi_streams_emqx_ctrl_kill(Config) ->
@ -1462,9 +1456,8 @@ t_multi_streams_emqx_ctrl_kill(Config) ->
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
exit(TransPid, kill),
timer:sleep(200),
%% Client should be closed
?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
assert_client_die(C).
t_multi_streams_emqx_ctrl_exit_normal(Config) ->
erlang:process_flag(trap_exit, true),
@ -1516,9 +1509,8 @@ t_multi_streams_emqx_ctrl_exit_normal(Config) ->
[{ClientId, TransPid}] = ets:lookup(emqx_channel, ClientId),
emqx_connection:stop(TransPid),
timer:sleep(200),
%% Client exit normal.
?assertMatch({'EXIT', {normal, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
assert_client_die(C).
t_multi_streams_remote_shutdown(Config) ->
erlang:process_flag(trap_exit, true),
@ -1570,9 +1562,8 @@ t_multi_streams_remote_shutdown(Config) ->
ok = stop_emqx(),
start_emqx_quic(?config(port, Config)),
timer:sleep(200),
%% Client should be closed
?assertMatch({'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}}, catch emqtt:info(C)).
assert_client_die(C).
t_multi_streams_remote_shutdown_with_reconnect(Config) ->
erlang:process_flag(trap_exit, true),
@ -1973,6 +1964,9 @@ test_dir(Config) ->
recv_pub(Count) ->
recv_pub(Count, [], 100).
recv_pub(Count, Tout) ->
recv_pub(Count, [], Tout).
recv_pub(0, Acc, _Tout) ->
lists:reverse(Acc);
recv_pub(Count, Acc, Tout) ->
@ -2036,6 +2030,19 @@ select_port() ->
via_stream({quic, _Conn, Stream}) ->
Stream.
assert_client_die(C) ->
assert_client_die(C, 100, 10).
assert_client_die(C, _, 0) ->
ct:fail("Client ~p did not die", [C]);
assert_client_die(C, Delay, Retries) ->
case catch emqtt:info(C) of
{'EXIT', {noproc, {gen_statem, call, [_, info, infinity]}}} ->
ok;
_Other ->
timer:sleep(Delay),
assert_client_die(C, Delay, Retries - 1)
end.
%% BUILD_WITHOUT_QUIC
-else.
-endif.

View File

@ -35,8 +35,18 @@ emqx_bridge_schema {
desc_status {
desc {
en: """The status of the bridge"""
zh: """Bridge 的状态"""
en: """The status of the bridge<br/>
- <code>connecting</code>: the initial state before any health probes were made.<br/>
- <code>connected</code>: when the bridge passes the health probes.<br/>
- <code>disconnected</code>: when the bridge can not pass health probes.<br/>
- <code>stopped</code>: when the bridge resource is requested to be stopped.<br/>
- <code>inconsistent</code>: When not all the nodes are at the same status."""
zh: """Bridge 的连接状态<br/>
- <code>connecting</code>: 启动时的初始状态。<br/>
- <code>connected</code>: 桥接驱动健康检查正常。<br/>
- <code>disconnected</code>: 当桥接无法通过健康检查。<br/>
- <code>stopped</code>: 桥接处于停用状态。<br/>
- <code>inconsistent</code>: 集群中有各节点汇报的状态不一致。"""
}
label: {
en: "Bridge Status"
@ -46,8 +56,16 @@ emqx_bridge_schema {
desc_node_status {
desc {
en: """The status of the bridge for each node"""
zh: """每个节点的 Bridge 状态"""
en: """The status of the bridge for each node.
- <code>connecting</code>: the initial state before any health probes were made.<br/>
- <code>connected</code>: when the bridge passes the health probes.<br/>
- <code>disconnected</code>: when the bridge can not pass health probes.<br/>
- <code>stopped</code>: when the bridge resource is requested to be stopped."""
zh: """每个节点的 Bridge 状态
- <code>connecting</code>: 启动时的初始状态。<br/>
- <code>connected</code>: 桥接驱动健康检查正常。<br/>
- <code>disconnected</code>: 当桥接无法通过健康检查。<br/>
- <code>stopped</code>: 桥接处于停用状态。"""
}
label: {
en: "Node Bridge Status"

View File

@ -743,8 +743,9 @@ collect_metrics(Bridges) ->
aggregate_metrics(AllMetrics) ->
InitMetrics = ?EMPTY_METRICS,
lists:foldl(
fun(
lists:foldl(fun aggregate_metrics/2, InitMetrics, AllMetrics).
aggregate_metrics(
#{
metrics := ?metrics(
M1, M2, M3, M4, M5, M6, M7, M8, M9, M10, M11, M12, M13, M14, M15, M16, M17
@ -753,7 +754,7 @@ aggregate_metrics(AllMetrics) ->
?metrics(
N1, N2, N3, N4, N5, N6, N7, N8, N9, N10, N11, N12, N13, N14, N15, N16, N17
)
) ->
) ->
?METRICS(
M1 + N1,
M2 + N2,
@ -772,11 +773,9 @@ aggregate_metrics(AllMetrics) ->
M15 + N15,
M16 + N16,
M17 + N17
)
end,
InitMetrics,
AllMetrics
).
);
aggregate_metrics(#{}, Metrics) ->
Metrics.
format_resp(Data) ->
format_resp(Data, node()).
@ -786,18 +785,26 @@ format_resp(
type := Type,
name := BridgeName,
raw_config := RawConf,
resource_data := #{status := Status, metrics := Metrics}
resource_data := ResourceData
},
Node
) ->
RawConfFull = fill_defaults(Type, RawConf),
redact(RawConfFull#{
redact(
maps:merge(
RawConfFull#{
type => Type,
name => maps:get(<<"name">>, RawConf, BridgeName),
node => Node,
status => Status,
metrics => format_metrics(Metrics)
}).
node => Node
},
format_resource_data(ResourceData)
)
).
format_resource_data(#{status := Status, metrics := Metrics}) ->
#{status => Status, metrics => format_metrics(Metrics)};
format_resource_data(#{status := Status}) ->
#{status => Status}.
format_metrics(#{
counters := #{

View File

@ -208,7 +208,7 @@ desc(_) ->
undefined.
status() ->
hoconsc:enum([connected, disconnected, connecting]).
hoconsc:enum([connected, disconnected, connecting, inconsistent]).
node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.

View File

@ -140,17 +140,13 @@ stop_http_server(Sock, Acceptor) ->
gen_tcp:close(Sock).
listen_on_random_port() ->
Min = 1024,
Max = 65000,
rand:seed(exsplus, erlang:timestamp()),
Port = rand:uniform(Max - Min) + Min,
case
gen_tcp:listen(Port, [
binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}
])
of
{ok, Sock} -> {Port, Sock};
{error, eaddrinuse} -> listen_on_random_port()
SockOpts = [binary, {active, false}, {packet, raw}, {reuseaddr, true}, {backlog, 1000}],
case gen_tcp:listen(0, SockOpts) of
{ok, Sock} ->
{ok, Port} = inet:port(Sock),
{Port, Sock};
{error, Reason} when Reason /= eaddrinuse ->
{error, Reason}
end.
accept_loop(Sock, HandleFun, Parent) ->
@ -543,7 +539,9 @@ do_start_stop_bridges(Type, Config) ->
{ok, 200, <<"[]">>} = request(get, uri(["bridges"]), []),
%% Create broken bridge
BadServer = <<"nohost">>,
{ListenPort, Sock} = listen_on_random_port(),
%% Connecting to this endpoint should always timeout
BadServer = iolist_to_binary(io_lib:format("localhost:~B", [ListenPort])),
BadName = <<"bad_", (atom_to_binary(Type))/binary>>,
{ok, 201, BadBridge1} = request(
post,
@ -555,11 +553,15 @@ do_start_stop_bridges(Type, Config) ->
<<"name">> := BadName,
<<"enable">> := true,
<<"server">> := BadServer,
<<"status">> := <<"disconnected">>,
<<"status">> := <<"connecting">>,
<<"node_status">> := [_ | _]
} = jsx:decode(BadBridge1),
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
{ok, 500, _} = request(post, operation_path(Type, start, BadBridgeID), <<"">>),
?assertMatch(
{ok, SC, _} when SC == 500 orelse SC == 503,
request(post, operation_path(Type, start, BadBridgeID), <<"">>)
),
ok = gen_tcp:close(Sock),
ok.
t_enable_disable_bridges(Config) ->

View File

@ -525,11 +525,11 @@ else
## only one emqx node is running, get running args from 'ps -ef' output
tmp_nodename=$(echo -e "$PS_LINE" | $GREP -oE "\s\-s?name.*" | awk '{print $2}' || true)
tmp_cookie=$(echo -e "$PS_LINE" | $GREP -oE "\s\-setcookie.*" | awk '{print $2}' || true)
tmp_dist="$(echo -e "$PS_LINE" | $GREP -oE '\-ssl_dist_optfile\s.+\s' | awk '{print $2}' || true)"
SSL_DIST_OPTFILE="$(echo -e "$PS_LINE" | $GREP -oE '\-ssl_dist_optfile\s.+\s' | awk '{print $2}' || true)"
tmp_ticktime="$(echo -e "$PS_LINE" | $GREP -oE '\s\-kernel\snet_ticktime\s.+\s' | awk '{print $3}' || true)"
# data_dir is actually not needed, but kept anyway
tmp_datadir="$(echo -e "$PS_LINE" | $GREP -oE "\-emqx_data_dir.*" | sed -E 's#.+emqx_data_dir[[:blank:]]##g' | sed -E 's#[[:blank:]]--$##g' || true)"
if [ -z "$tmp_dist" ]; then
if [ -z "$SSL_DIST_OPTFILE" ]; then
tmp_proto='inet_tcp'
else
tmp_proto='inet_tls'
@ -945,7 +945,7 @@ if [ -n "${EMQX_NODE_COOKIE:-}" ]; then
fi
COOKIE="${EMQX_NODE__COOKIE:-}"
COOKIE_IN_USE="$(get_boot_config 'node.cookie')"
if [ -n "$COOKIE_IN_USE" ] && [ -n "$COOKIE" ] && [ "$COOKIE" != "$COOKIE_IN_USE" ]; then
if [ "$IS_BOOT_COMMAND" != 'yes' ] && [ -n "$COOKIE_IN_USE" ] && [ -n "$COOKIE" ] && [ "$COOKIE" != "$COOKIE_IN_USE" ]; then
die "EMQX_NODE__COOKIE is different from the cookie used by $NAME"
fi
[ -z "$COOKIE" ] && COOKIE="$COOKIE_IN_USE"

View File

@ -1 +1 @@
为 QUIC 听器添加更多底层调优选项。
为 QUIC 听器添加更多底层调优选项。

View File

@ -0,0 +1 @@
When the resource manager is busy trying to establish a connection with the remote, the resource might yet lack any metrics information. Prior to this fix, the `bridges/` API handler crashed in such circumstances.

View File

@ -0,0 +1 @@
当资源管理器忙于尝试与远程建立连接时,资源可能还缺少任何度量信息。 在此修复之前,`bridges/' API 处理程序在这种情况下崩溃。

View File

@ -0,0 +1,2 @@
Fix Swagger API doc rendering crash.
In version 5.0.18, a bug was introduced that resulted in duplicated field names in the configuration schema. This, in turn, caused the Swagger schema generated to become invalid.

View File

@ -0,0 +1,2 @@
修复 Swagger API 文档渲染崩溃。
在版本 5.0.18 中,引入了一个错误,导致配置 schema 中出现了重复的配置名称,进而导致生成了无效的 Swagger spec。

View File

@ -0,0 +1,2 @@
For influxdb bridge, added integer value placeholder annotation hint to `write_syntax` documentation.
Also supported setting a constant value for the `timestamp` field.

View File

@ -0,0 +1,2 @@
为 influxdb 桥接的配置项 `write_syntax` 描述文档增加了类型标识符的提醒。
另外在配置中支持 `timestamp` 使用一个常量。

View File

@ -0,0 +1,5 @@
Improve behavior of the `replicant` nodes when the `core` cluster becomes partitioned (for example when a core node leaves the cluster).
Previously, the replicant nodes were unable to rebalance connections to the core nodes, until the core cluster became whole again.
This was indicated by the error messages: `[error] line: 182, mfa: mria_lb:list_core_nodes/1, msg: mria_lb_core_discovery divergent cluster`.
[Mria PR](https://github.com/emqx/mria/pull/123/files)

View File

@ -0,0 +1,6 @@
改进 `core` 集群被分割时 `replicant`节点的行为。
修复前,如果 `core` 集群分裂成两个小集群(例如一个节点离开集群)时,`replicant` 节点无法重新平衡与核心节点的连接,直到核心集群再次变得完整。
这种个问题会导致 replicant 节点出现如下日志:
`[error] line: 182, mfa: mria_lb:list_core_nodes/1, msg: mria_lb_core_discovery divergent cluster`
[Mria PR](https://github.com/emqx/mria/pull/123/files)

View File

@ -0,0 +1,3 @@
Fixed two bugs introduced in v5.0.18.
* The environment varialbe `SSL_DIST_OPTFILE` was not set correctly for non-boot commands.
* When cookie is overridden from environment variable, EMQX node is unable to start.

View File

@ -0,0 +1,3 @@
修复 v5.0.18 引入的 2 个bug。
* 环境变量 `SSL_DIST_OPTFILE` 的值设置错误导致节点无法为 Erlang distribution 启用 SSL。
* 当节点的 cookie 从环境变量重载 (而不是设置在配置文件中时),节点无法启动的问题。

View File

@ -22,14 +22,16 @@ See also [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/
TLDR:</br>
```
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
```"""
```
Please note that a placeholder for an integer value must be annotated with a suffix `i`. For example `${payload.int_value}i`."""
zh: """使用 InfluxDB API Line Protocol 写入 InfluxDB 的数据,支持占位符</br>
参考 [InfluxDB 2.3 Line Protocol](https://docs.influxdata.com/influxdb/v2.3/reference/syntax/line-protocol/) 及
[InfluxDB 1.8 Line Protocol](https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial/) </br>
TLDR: </br>
```
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
```"""
```
注意,整形数值占位符后需要添加一个字符 `i` 类型标识。例如 `${payload.int_value}i`"""
}
label {
en: "Write Syntax"

View File

@ -663,6 +663,54 @@ t_start_ok_no_subject_tags_write_syntax(Config) ->
),
ok.
t_const_timestamp(Config) ->
QueryMode = ?config(query_mode, Config),
Const = erlang:system_time(nanosecond),
ConstBin = integer_to_binary(Const),
TsStr = iolist_to_binary(
calendar:system_time_to_rfc3339(Const, [{unit, nanosecond}, {offset, "Z"}])
),
?assertMatch(
{ok, _},
create_bridge(
Config,
#{
<<"write_syntax">> =>
<<"mqtt,clientid=${clientid} foo=${payload.foo}i,bar=5i ", ConstBin/binary>>
}
)
),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
Payload = #{<<"foo">> => 123},
SentData = #{
<<"clientid">> => ClientId,
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
<<"payload">> => Payload
},
?assertEqual(ok, send_message(Config, SentData)),
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{foo => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
TimeReturned0 = maps:get(<<"_time">>, maps:get(<<"foo">>, PersistedData)),
TimeReturned = pad_zero(TimeReturned0),
?assertEqual(TsStr, TimeReturned).
%% influxdb returns timestamps without trailing zeros such as
%% "2023-02-28T17:21:51.63678163Z"
%% while the standard should be
%% "2023-02-28T17:21:51.636781630Z"
pad_zero(BinTs) ->
StrTs = binary_to_list(BinTs),
[Nano | Rest] = lists:reverse(string:tokens(StrTs, ".")),
[$Z | NanoNum] = lists:reverse(Nano),
Padding = lists:duplicate(10 - length(Nano), $0),
NewNano = lists:reverse(NanoNum) ++ Padding ++ "Z",
iolist_to_binary(string:join(lists:reverse([NewNano | Rest]), ".")).
t_boolean_variants(Config) ->
QueryMode = ?config(query_mode, Config),
?assertMatch(
@ -783,7 +831,7 @@ t_bad_timestamp(Config) ->
[
#{
error := [
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
{error, {bad_timestamp, <<"bad_timestamp">>}}
]
}
],
@ -793,7 +841,7 @@ t_bad_timestamp(Config) ->
?assertEqual(
{error,
{unrecoverable_error, [
{error, {bad_timestamp, [<<"bad_timestamp">>]}}
{error, {bad_timestamp, <<"bad_timestamp">>}}
]}},
Return
);

View File

@ -492,11 +492,11 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
is_list(Ts)
->
TransOptions = #{return => rawlist, var_trans => fun data_filter/1},
case emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions) of
[TsInt] when is_integer(TsInt) ->
case parse_timestamp(emqx_plugin_libs_rule:proc_tmpl(Ts, Data, TransOptions)) of
{ok, TsInt} ->
Item1 = Item#{timestamp => TsInt},
continue_lines_to_points(Data, Item1, Rest, ResultPointsAcc, ErrorPointsAcc);
BadTs ->
{error, BadTs} ->
lines_to_points(Data, Rest, ResultPointsAcc, [
{error, {bad_timestamp, BadTs}} | ErrorPointsAcc
])
@ -506,6 +506,16 @@ lines_to_points(Data, [#{timestamp := Ts} = Item | Rest], ResultPointsAcc, Error
->
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc).
parse_timestamp([TsInt]) when is_integer(TsInt) ->
{ok, TsInt};
parse_timestamp([TsBin]) ->
try
{ok, binary_to_integer(TsBin)}
catch
_:_ ->
{error, TsBin}
end.
continue_lines_to_points(Data, Item, Rest, ResultPointsAcc, ErrorPointsAcc) ->
case line_to_point(Data, Item) of
#{fields := Fields} when map_size(Fields) =:= 0 ->

View File

@ -54,7 +54,7 @@ defmodule EMQXUmbrella.MixProject do
{:cowboy, github: "emqx/cowboy", tag: "2.9.0", override: true},
{:esockd, github: "emqx/esockd", tag: "5.9.4", override: true},
{:rocksdb, github: "emqx/erlang-rocksdb", tag: "1.7.2-emqx-9", override: true},
{:ekka, github: "emqx/ekka", tag: "0.14.1", override: true},
{:ekka, github: "emqx/ekka", tag: "0.14.2", override: true},
{:gen_rpc, github: "emqx/gen_rpc", tag: "2.8.1", override: true},
{:grpc, github: "emqx/grpc-erl", tag: "0.6.7", override: true},
{:minirest, github: "emqx/minirest", tag: "1.3.8", override: true},
@ -69,7 +69,7 @@ defmodule EMQXUmbrella.MixProject do
# in conflict by emqtt and hocon
{:getopt, "1.0.2", override: true},
{:snabbkaffe, github: "kafka4beam/snabbkaffe", tag: "1.0.0", override: true},
{:hocon, github: "emqx/hocon", tag: "0.35.3", override: true},
{:hocon, github: "emqx/hocon", tag: "0.36.0", override: true},
{:emqx_http_lib, github: "emqx/emqx_http_lib", tag: "0.5.2", override: true},
{:esasl, github: "emqx/esasl", tag: "0.2.0"},
{:jose, github: "potatosalad/erlang-jose", tag: "1.11.2"},

View File

@ -56,7 +56,7 @@
, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}
, {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.9.4"}}}
, {rocksdb, {git, "https://github.com/emqx/erlang-rocksdb", {tag, "1.7.2-emqx-9"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.1"}}}
, {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.14.2"}}}
, {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.8.1"}}}
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.7"}}}
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.8"}}}
@ -69,7 +69,7 @@
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
, {getopt, "1.0.2"}
, {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "1.0.0"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.35.3"}}}
, {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.36.0"}}}
, {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.5.2"}}}
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}

View File

@ -0,0 +1,25 @@
# Test influxdb integration
This script starts two EMQX nodes and a influxdb server in docker container.
The bootstraping rule engine and data bridge config is provided in influx-bridge.conf
which got included in the bootstraping config bundle emqx.conf.
## Start the cluster
./start.sh
## How to run tests
The rule and bridge are configured to pipe data from MQTT topic `t/#` to the 'myvalues' measurement in the 'mqtt' bucket.
### Manual verification steps
* Start the cluster
* Send mqtt messages to topic `/t/a` with a JSON object as MQTT paylaod like `{"value": 1}`
* Observe data in influxdb `curl -k -H 'Authorization: Token abcdefg' -G 'https://localhost:8086/query?pretty=true' --data-urlencode "db=mqtt" --data-urlencode "q=SELECT * from myvalues"`
Example output the curl query against influxdb:
```
{"results":[{"statement_id":0,"series":[{"name":"myvalues","columns":["time","clientid","value"],"values":[["2023-02-28T11:13:29.039Z","a1",123]]}]}]
```

View File

@ -30,7 +30,7 @@ bridges {
versions = ["tlsv1.3", "tlsv1.2", "tlsv1.1", "tlsv1"]
}
token = "abcdefg"
write_syntax = "mqtt,clientid=${clientid} value=${payload.value}"
write_syntax = "myvalues,clientid=${clientid} value=${payload.value}i"
}
}
}

View File

@ -8,9 +8,10 @@ set -euo pipefail
## this is why a docker network is created, and the containers's names have a dot.
# ensure dir
cd -P -- "$(dirname -- "$0")/.."
cd -P -- "$(dirname -- "$0")/../../"
IMAGE="${1}"
IMAGE1="${1}"
IMAGE2="${2:-${IMAGE1}}"
NET='emqx.io'
NODE1="node1.$NET"
@ -35,7 +36,7 @@ docker run -d -t --restart=always --name "$NODE1" \
-e EMQX_listeners__wss__default__enable=false \
-e EMQX_listeners__tcp__default__proxy_protocol=true \
-e EMQX_listeners__ws__default__proxy_protocol=true \
"$IMAGE"
"$IMAGE1"
docker run -d -t --restart=always --name "$NODE2" \
--net "$NET" \
@ -47,7 +48,7 @@ docker run -d -t --restart=always --name "$NODE2" \
-e EMQX_listeners__wss__default__enable=false \
-e EMQX_listeners__tcp__default__proxy_protocol=true \
-e EMQX_listeners__ws__default__proxy_protocol=true \
"$IMAGE"
"$IMAGE2"
mkdir -p tmp
cat <<EOF > tmp/haproxy.cfg
@ -90,8 +91,11 @@ frontend emqx_dashboard
default_backend emqx_dashboard_back
backend emqx_dashboard_back
# Must use a consistent dispatch when EMQX is running on different versions
# because the js files for the dashboard is chunked, having the backends sharing
# load randomly will cause the browser fail to GET some chunks (or get bad chunks if names clash)
balance source
mode http
# balance static-rr
server emqx-1 $NODE1:18083
server emqx-2 $NODE2:18083
@ -142,7 +146,7 @@ wait_for_emqx() {
container="$1"
wait_limit="$2"
wait_sec=0
while ! docker exec "$container" emqx_ctl status >/dev/null 2>&1; do
while ! docker exec "$container" emqx ctl status; do
wait_sec=$(( wait_sec + 1 ))
if [ $wait_sec -gt "$wait_limit" ]; then
echo "timeout wait for EMQX"
@ -178,4 +182,4 @@ wait_for_haproxy 10
echo
docker exec $NODE1 emqx_ctl cluster join "emqx@$NODE2"
docker exec $NODE1 emqx ctl cluster join "emqx@$NODE2"