Merge pull request #11301 from id/0719-sync-release-51-to-master

This commit is contained in:
Ivan Dyachkov 2023-07-19 14:21:26 +02:00 committed by GitHub
commit f29a9ed9d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 550 additions and 152 deletions

View File

@ -17,8 +17,15 @@ jobs:
- uses: erlef/setup-beam@v1.15.4 - uses: erlef/setup-beam@v1.15.4
with: with:
otp-version: 25.3.2 otp-version: 25.3.2
- name: Cache Jmeter
id: cache-jmeter
uses: actions/cache@v3
with:
path: /tmp/apache-jmeter.tgz
key: apache-jmeter-5.4.3.tgz
- name: download jmeter - name: download jmeter
timeout-minutes: 3 if: steps.cache-jmeter.outputs.cache-hit != 'true'
timeout-minutes: 15
env: env:
JMETER_VERSION: 5.4.3 JMETER_VERSION: 5.4.3
run: | run: |

View File

@ -16,7 +16,7 @@ endif
# Dashboard version # Dashboard version
# from https://github.com/emqx/emqx-dashboard5 # from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.3.2 export EMQX_DASHBOARD_VERSION ?= v1.3.2
export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.3 export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.4
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
# In make 4.4+, for backward-compatibility the value from the original environment is used. # In make 4.4+, for backward-compatibility the value from the original environment is used.

View File

@ -33,12 +33,15 @@
-define(ERTS_MINIMUM_REQUIRED, "10.0"). -define(ERTS_MINIMUM_REQUIRED, "10.0").
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% Topics' prefix: $SYS | $share %% Topics' prefix: $SYS | $queue | $share
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% System topic %% System topic
-define(SYSTOP, <<"$SYS/">>). -define(SYSTOP, <<"$SYS/">>).
%% Queue topic
-define(QUEUE, <<"$queue/">>).
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% alarms %% alarms
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.1.1"). -define(EMQX_RELEASE_CE, "5.1.1").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.1.1-alpha.1"). -define(EMQX_RELEASE_EE, "5.1.1-alpha.2").
%% The HTTP API version %% The HTTP API version
-define(EMQX_API_VERSION, "5.0"). -define(EMQX_API_VERSION, "5.0").

View File

@ -455,7 +455,7 @@ handle_in(
NChannel = Channel#channel{session = NSession}, NChannel = Channel#channel{session = NSession},
handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel);
{error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} ->
?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), ?SLOG(warning, #{msg => "pubrel_packetId_not_found", packetId => PacketId}),
ok = emqx_metrics:inc('packets.pubrel.missed'), ok = emqx_metrics:inc('packets.pubrel.missed'),
handle_out(pubcomp, {PacketId, RC}, Channel) handle_out(pubcomp, {PacketId, RC}, Channel)
end; end;

View File

@ -2504,7 +2504,11 @@ to_integer(Str) ->
end. end.
to_percent(Str) -> to_percent(Str) ->
{ok, hocon_postprocess:percent(Str)}. Percent = hocon_postprocess:percent(Str),
case is_number(Percent) andalso Percent >= 0.0 andalso Percent =< 1.0 of
true -> {ok, Percent};
false -> {error, Str}
end.
to_comma_separated_list(Str) -> to_comma_separated_list(Str) ->
{ok, string:tokens(Str, ", ")}. {ok, string:tokens(Str, ", ")}.
@ -2732,15 +2736,17 @@ check_cpu_watermark(Conf) ->
check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf). check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf).
check_watermark(LowKey, HighKey, Conf) -> check_watermark(LowKey, HighKey, Conf) ->
case hocon_maps:get(LowKey, Conf) of case to_percent(hocon_maps:get(LowKey, Conf)) of
undefined -> {error, undefined} ->
true; true;
Low -> {ok, Low} ->
High = hocon_maps:get(HighKey, Conf), case to_percent(hocon_maps:get(HighKey, Conf)) of
case Low < High of {ok, High} when High > Low -> true;
true -> true; {ok, High} -> {bad_watermark, #{LowKey => Low, HighKey => High}};
false -> {bad_watermark, #{LowKey => Low, HighKey => High}} {error, HighVal} -> {bad_watermark, #{HighKey => HighVal}}
end end;
{error, Low} ->
{bad_watermark, #{LowKey => Low}}
end. end.
str(A) when is_atom(A) -> str(A) when is_atom(A) ->

View File

@ -244,8 +244,12 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) ->
parse(TopicFilter, Options). parse(TopicFilter, Options).
-spec parse(topic(), map()) -> {topic(), map()}. -spec parse(topic(), map()) -> {topic(), map()}.
parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter});
parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) ->
error({invalid_topic_filter, TopicFilter}); error({invalid_topic_filter, TopicFilter});
parse(<<"$queue/", TopicFilter/binary>>, Options) ->
parse(TopicFilter, Options#{share => <<"$queue">>});
parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> parse(TopicFilter = <<"$share/", Rest/binary>>, Options) ->
case binary:split(Rest, <<"/">>) of case binary:split(Rest, <<"/">>) of
[_Any] -> [_Any] ->

View File

@ -444,7 +444,7 @@ systopic_mon() ->
sharetopic() -> sharetopic() ->
?LET( ?LET(
{Type, Grp, T}, {Type, Grp, T},
{oneof([<<"$share">>]), list(latin_char()), normal_topic()}, {oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()},
<<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>> <<Type/binary, "/", (iolist_to_binary(Grp))/binary, "/", T/binary>>
). ).

View File

@ -20,8 +20,10 @@
-compile(nowarn_export_all). -compile(nowarn_export_all).
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
-include_lib("emqx/include/emqx_mqtt.hrl").
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(SUITE, ?MODULE). -define(SUITE, ?MODULE).
@ -986,6 +988,112 @@ t_session_kicked(Config) when is_list(Config) ->
?assertEqual([], collect_msgs(0)), ?assertEqual([], collect_msgs(0)),
ok. ok.
%% FIXME: currently doesn't work
%% t_different_groups_same_topic({init, Config}) ->
%% TestName = atom_to_binary(?FUNCTION_NAME),
%% ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
%% {ok, _} = emqtt:connect(C),
%% [{client, C}, {clientid, ClientId} | Config];
%% t_different_groups_same_topic({'end', Config}) ->
%% C = ?config(client, Config),
%% emqtt:stop(C),
%% ok;
%% t_different_groups_same_topic(Config) when is_list(Config) ->
%% C = ?config(client, Config),
%% ClientId = ?config(clientid, Config),
%% %% Subscribe and unsubscribe to both $queue and $shared topics
%% Topic = <<"t/1">>,
%% SharedTopic0 = <<"$share/aa/", Topic/binary>>,
%% SharedTopic1 = <<"$share/bb/", Topic/binary>>,
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}),
%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}),
%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
%% emqx:publish(Message0),
%% ?assertMatch([ {publish, #{payload := <<"hi">>}}
%% , {publish, #{payload := <<"hi">>}}
%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}),
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0),
%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1),
%% ok.
t_queue_subscription({init, Config}) ->
TestName = atom_to_binary(?FUNCTION_NAME),
ClientId = <<TestName/binary, (integer_to_binary(erlang:unique_integer()))/binary>>,
{ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]),
{ok, _} = emqtt:connect(C),
[{client, C}, {clientid, ClientId} | Config];
t_queue_subscription({'end', Config}) ->
C = ?config(client, Config),
emqtt:stop(C),
ok;
t_queue_subscription(Config) when is_list(Config) ->
C = ?config(client, Config),
ClientId = ?config(clientid, Config),
%% Subscribe and unsubscribe to both $queue and $shared topics
Topic = <<"t/1">>,
QueueTopic = <<"$queue/", Topic/binary>>,
SharedTopic = <<"$share/aa/", Topic/binary>>,
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}),
{ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}),
%% FIXME: we should actually see 2 routes, one for each group
%% ($queue and aa), but currently the latest subscription
%% overwrites the existing one.
?retry(
_Sleep0 = 100,
_Attempts0 = 50,
begin
ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% FIXME: should ensure we have 2 subscriptions
true = emqx_router:has_routes(Topic)
end
),
%% now publish to the underlying topic
Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>),
emqx:publish(Message0),
?assertMatch(
[
{publish, #{payload := <<"hi">>}}
%% FIXME: should receive one message from each group
%% , {publish, #{payload := <<"hi">>}}
],
collect_msgs(5_000)
),
{ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic),
%% FIXME: return code should be success instead of 17 ("no_subscription_existed")
{ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic),
%% FIXME: this should eventually be true, but currently we leak
%% the previous group subscription...
%% ?retry(
%% _Sleep0 = 100,
%% _Attempts0 = 50,
%% begin
%% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]),
%% false = emqx_router:has_routes(Topic)
%% end
%% ),
ct:sleep(500),
Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>),
emqx:publish(Message1),
%% FIXME: we should *not* receive any messages...
%% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}),
%% This is from the leaked group...
?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{
routes => ets:tab2list(emqx_route)
}),
ok.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
%% help functions %% help functions
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------

View File

@ -211,6 +211,10 @@ t_systop(_) ->
?assertEqual(SysTop2, systop(<<"abc">>)). ?assertEqual(SysTop2, systop(<<"abc">>)).
t_feed_var(_) -> t_feed_var(_) ->
?assertEqual(
<<"$queue/client/clientId">>,
feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)
),
?assertEqual( ?assertEqual(
<<"username/test/client/x">>, <<"username/test/client/x">>,
feed_var( feed_var(
@ -232,6 +236,10 @@ long_topic() ->
iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
t_parse(_) -> t_parse(_) ->
?assertError(
{invalid_topic_filter, <<"$queue/t">>},
parse(<<"$queue/t">>, #{share => <<"g">>})
),
?assertError( ?assertError(
{invalid_topic_filter, <<"$share/g/t">>}, {invalid_topic_filter, <<"$share/g/t">>},
parse(<<"$share/g/t">>, #{share => <<"g">>}) parse(<<"$share/g/t">>, #{share => <<"g">>})
@ -246,9 +254,11 @@ t_parse(_) ->
), ),
?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)),
?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})),
?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)),
?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)),
%% The '$local' and '$fastlane' topics have been deprecated. %% The '$local' and '$fastlane' topics have been deprecated.
?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)),
?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)),
?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)),
?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)).

View File

@ -988,15 +988,10 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) ->
%% still on an older bpapi version that doesn't support it. %% still on an older bpapi version that doesn't support it.
maybe_try_restart(NodeOrAll, OperFunc, Args); maybe_try_restart(NodeOrAll, OperFunc, Args);
{error, timeout} -> {error, timeout} ->
?SERVICE_UNAVAILABLE(<<"Request timeout">>); ?BAD_REQUEST(<<"Request timeout">>);
{error, {start_pool_failed, Name, Reason}} -> {error, {start_pool_failed, Name, Reason}} ->
Msg = bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason])), Msg = bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason])),
case Reason of ?BAD_REQUEST(Msg);
nxdomain ->
?BAD_REQUEST(Msg);
_ ->
?SERVICE_UNAVAILABLE(Msg)
end;
{error, not_found} -> {error, not_found} ->
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
?SLOG(warning, #{ ?SLOG(warning, #{

View File

@ -834,7 +834,8 @@ do_start_stop_bridges(Type, Config) ->
), ),
BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName),
?assertMatch( ?assertMatch(
{ok, SC, _} when SC == 500 orelse SC == 503, %% request from product: return 400 on such errors
{ok, SC, _} when SC == 500 orelse SC == 400,
request(post, {operation, Type, start, BadBridgeID}, Config) request(post, {operation, Type, start, BadBridgeID}, Config)
), ),
ok = gen_tcp:close(Sock), ok = gen_tcp:close(Sock),

View File

@ -10,6 +10,8 @@
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
%% ct setup helpers %% ct setup helpers
init_per_suite(Config, Apps) -> init_per_suite(Config, Apps) ->
@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) ->
Res. Res.
create_rule_and_action_http(BridgeType, RuleTopic, Config) -> create_rule_and_action_http(BridgeType, RuleTopic, Config) ->
create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}).
create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) ->
BridgeName = ?config(bridge_name, Config), BridgeName = ?config(bridge_name, Config),
BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName),
SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>),
Params = #{ Params = #{
enable => true, enable => true,
sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>, sql => SQL,
actions => [BridgeId] actions => [BridgeId]
}, },
Path = emqx_mgmt_api_test_util:api_path(["rules"]), Path = emqx_mgmt_api_test_util:api_path(["rules"]),
AuthHeader = emqx_mgmt_api_test_util:auth_header_(), AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
ct:pal("rule action params: ~p", [Params]), ct:pal("rule action params: ~p", [Params]),
case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of
{ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; {ok, Res0} ->
Error -> Error Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]),
on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end),
{ok, Res};
Error ->
Error
end. end.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------

View File

@ -119,10 +119,13 @@ roots() ->
fields(config) -> fields(config) ->
[ [
{url, mk(binary(), #{required => true, desc => ?DESC("url")})}, {url,
mk(binary(), #{
required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">>
})},
{stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})},
{partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})},
{pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, {pool_size, fun emqx_connector_schema_lib:pool_size/1},
{grpc_timeout, fun grpc_timeout/1} {grpc_timeout, fun grpc_timeout/1}
] ++ emqx_connector_schema_lib:ssl_fields(). ] ++ emqx_connector_schema_lib:ssl_fields().

View File

@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) ->
iotdb_bridge_on_query iotdb_bridge_on_query
). ).
t_extract_device_id_from_rule_engine_message(Config) ->
BridgeType = ?config(bridge_type, Config),
RuleTopic = <<"t/iotdb">>,
DeviceId = iotdb_device(Config),
Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"),
Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)),
?check_trace(
begin
{ok, _} = emqx_bridge_testlib:create_bridge(Config),
SQL = <<
"SELECT\n"
" payload.measurement, payload.data_type, payload.value, payload.device_id\n"
"FROM\n"
" \"",
RuleTopic/binary,
"\""
>>,
Opts = #{sql => SQL},
{ok, _} = emqx_bridge_testlib:create_rule_and_action_http(
BridgeType, RuleTopic, Config, Opts
),
emqx:publish(Message),
?block_until(handle_async_reply, 5_000),
ok
end,
fun(Trace) ->
?assertMatch(
[#{action := ack, result := {ok, 200, _, _}}],
?of_kind(handle_async_reply, Trace)
),
ok
end
),
ok.
t_sync_invalid_data(Config) -> t_sync_invalid_data(Config) ->
emqx_bridge_testlib:t_sync_query( emqx_bridge_testlib:t_sync_query(
Config, Config,

View File

@ -23,7 +23,7 @@
-define(DEFAULT_SQL, << -define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) " "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) "
"values (${ts}, ${id}, ${topic}, ${qos}, ${payload}, ${timestamp})" "values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})"
>>). >>).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------

View File

@ -124,7 +124,7 @@ on_query(InstanceId, {query, SQL}, State) ->
on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) ->
case maps:find(Key, InsertTksMap) of case maps:find(Key, InsertTksMap) of
{ok, Tokens} when is_map(Data) -> {ok, Tokens} when is_map(Data) ->
SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data), SQL = emqx_placeholder:proc_tmpl(Tokens, Data),
do_query(InstanceId, SQL, State); do_query(InstanceId, SQL, State);
_ -> _ ->
{error, {unrecoverable_error, invalid_request}} {error, {unrecoverable_error, invalid_request}}
@ -209,31 +209,16 @@ execute(Conn, Query, Opts) ->
tdengine:insert(Conn, Query, Opts). tdengine:insert(Conn, Query, Opts).
do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> do_batch_insert(Conn, Tokens, BatchReqs, Opts) ->
Queries = aggregate_query(Tokens, BatchReqs), SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>),
SQL = maps:fold(
fun(InsertPart, Values, Acc) ->
lists:foldl(
fun(ValuePart, IAcc) ->
<<IAcc/binary, " ", ValuePart/binary>>
end,
<<Acc/binary, " ", InsertPart/binary, " VALUES">>,
Values
)
end,
<<"INSERT INTO">>,
Queries
),
execute(Conn, SQL, Opts). execute(Conn, SQL, Opts).
aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> aggregate_query(BatchTks, BatchReqs, Acc) ->
lists:foldl( lists:foldl(
fun({_, Data}, Acc) -> fun({_, Data}, InAcc) ->
InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data), InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data),
ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data), <<InAcc/binary, " ", InsertPart/binary>>
Values = maps:get(InsertPart, Acc, []),
maps:put(InsertPart, [ParamsPart | Values], Acc)
end, end,
#{}, Acc,
BatchReqs BatchReqs
). ).
@ -260,13 +245,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) ->
InsertTks = emqx_placeholder:preproc_tmpl(H), InsertTks = emqx_placeholder:preproc_tmpl(H),
H1 = string:trim(H, trailing, ";"), H1 = string:trim(H, trailing, ";"),
case split_insert_sql(H1) of case split_insert_sql(H1) of
[_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> [_InsertPart, BatchDesc] ->
InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart), BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc),
ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart),
parse_batch_prepare_sql( parse_batch_prepare_sql(
T, T,
InsertTksMap#{Key => InsertTks}, InsertTksMap#{Key => InsertTks},
BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} BatchTksMap#{Key => BatchTks}
); );
Result -> Result ->
?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}),
@ -299,7 +283,7 @@ split_insert_sql(SQL0) ->
{true, E1} {true, E1}
end end
end, end,
re:split(SQL, "(?i)(insert into)|(?i)(values)") re:split(SQL, "(?i)(insert into)")
). ).
formalize_sql(Input) -> formalize_sql(Input) ->

View File

@ -13,7 +13,8 @@
% SQL definitions % SQL definitions
-define(SQL_BRIDGE, -define(SQL_BRIDGE,
"insert into mqtt.t_mqtt_msg(ts, payload) values (${timestamp}, ${payload})" "insert into t_mqtt_msg(ts, payload) values (${timestamp}, '${payload}')"
"t_mqtt_msg(ts, payload) values (${second_ts}, '${payload}')"
). ).
-define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;"). -define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;").
@ -29,7 +30,8 @@
-define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg").
-define(AUTO_CREATE_BRIDGE, -define(AUTO_CREATE_BRIDGE,
"insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})" "insert into ${clientid} USING s_tab TAGS ('${clientid}') values (${timestamp}, '${payload}')"
"test_${clientid} USING s_tab TAGS ('${clientid}') values (${second_ts}, '${payload}')"
). ).
-define(SQL_CREATE_STABLE, -define(SQL_CREATE_STABLE,
@ -301,7 +303,7 @@ connect_and_clear_table(Config) ->
connect_and_get_payload(Config) -> connect_and_get_payload(Config) ->
?WITH_CON( ?WITH_CON(
{ok, #{<<"code">> := 0, <<"data">> := [[Result]]}} = directly_query(Con, ?SQL_SELECT) {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, ?SQL_SELECT)
), ),
Result. Result.
@ -329,7 +331,7 @@ t_setup_via_config_and_publish(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
?check_trace( ?check_trace(
begin begin
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
@ -342,7 +344,7 @@ t_setup_via_config_and_publish(Config) ->
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
), ),
ok ok
@ -368,7 +370,8 @@ t_setup_via_http_api_and_publish(Config) ->
{ok, _}, {ok, _},
create_bridge_http(TDengineConfig) create_bridge_http(TDengineConfig)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
?check_trace( ?check_trace(
begin begin
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -386,7 +389,7 @@ t_setup_via_http_api_and_publish(Config) ->
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
), ),
ok ok
@ -426,7 +429,7 @@ t_write_failure(Config) ->
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
?wait_async_action( ?wait_async_action(
@ -461,7 +464,7 @@ t_write_timeout(Config) ->
} }
} }
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
%% FIXME: TDengine connector hangs indefinetily during %% FIXME: TDengine connector hangs indefinetily during
%% `call_query' while the connection is unresponsive. Should add %% `call_query' while the connection is unresponsive. Should add
%% a timeout to `APPLY_RESOURCE' in buffer worker?? %% a timeout to `APPLY_RESOURCE' in buffer worker??
@ -486,7 +489,7 @@ t_simple_sql_query(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {query, <<"SELECT count(1) AS T">>}, Request = {query, <<"SELECT 1 AS T">>},
{_, {ok, #{result := Result}}} = {_, {ok, #{result := Result}}} =
?wait_async_action( ?wait_async_action(
query_resource(Config, Request), query_resource(Config, Request),
@ -537,37 +540,41 @@ t_bad_sql_parameter(Config) ->
?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result),
ok. ok.
t_nasty_sql_string(Config) -> %% TODO
?assertMatch( %% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders,
{ok, _}, %% the SQL quote(escape) is removed now,
create_bridge(Config) %% we should introduce a new syntax for placeholders to allow some vars to keep unquote.
), %% t_nasty_sql_string(Config) ->
% NOTE %% ?assertMatch(
% Column `payload` has BINARY type, so we would certainly like to test it %% {ok, _},
% with `lists:seq(1, 127)`, but: %% create_bridge(Config)
% 1. There's no way to insert zero byte in an SQL string, seems that TDengine's %% ),
% parser[1] has no escaping sequence for it so a zero byte probably confuses %% % NOTE
% interpreter somewhere down the line. %% % Column `payload` has BINARY type, so we would certainly like to test it
% 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for %% % with `lists:seq(1, 127)`, but:
% some reason. %% % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's
% %% % parser[1] has no escaping sequence for it so a zero byte probably confuses
% [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 %% % interpreter somewhere down the line.
Payload = list_to_binary(lists:seq(1, 127)), %% % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, %% % some reason.
{_, {ok, #{result := Result}}} = %% %
?wait_async_action( %% % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
send_message(Config, Message), %% Payload = list_to_binary(lists:seq(1, 127)),
#{?snk_kind := buffer_worker_flush_ack}, %% Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
2_000 %% {_, {ok, #{result := Result}}} =
), %% ?wait_async_action(
?assertMatch( %% send_message(Config, Message),
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, %% #{?snk_kind := buffer_worker_flush_ack},
Result %% 2_000
), %% ),
?assertEqual( %% ?assertMatch(
Payload, %% {ok, #{<<"code">> := 0, <<"rows">> := 1}},
connect_and_get_payload(Config) %% Result
). %% ),
%% ?assertEqual(
%% Payload,
%% connect_and_get_payload(Config)
%% ).
t_simple_insert(Config) -> t_simple_insert(Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
@ -576,7 +583,7 @@ t_simple_insert(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010},
Request = {send_message, SentData}, Request = {send_message, SentData},
{_, {ok, #{result := _Result}}} = {_, {ok, #{result := _Result}}} =
?wait_async_action( ?wait_async_action(
@ -585,7 +592,7 @@ t_simple_insert(Config) ->
2_000 2_000
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, [[?PAYLOAD], [?PAYLOAD]],
connect_and_get_payload(Config) connect_and_get_payload(Config)
). ).
@ -602,7 +609,9 @@ t_batch_insert(Config) ->
?wait_async_action( ?wait_async_action(
lists:foreach( lists:foreach(
fun(Idx) -> fun(Idx) ->
SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx}, SentData = #{
payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000
},
Request = {send_message, SentData}, Request = {send_message, SentData},
query_resource(Config, Request) query_resource(Config, Request)
end, end,
@ -613,11 +622,12 @@ t_batch_insert(Config) ->
2_000 2_000
), ),
DoubleSize = Size * 2,
?retry( ?retry(
_Sleep = 50, _Sleep = 50,
_Attempts = 30, _Attempts = 30,
?assertMatch( ?assertMatch(
[[Size]], [[DoubleSize]],
connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg")
) )
). ).
@ -633,6 +643,7 @@ t_auto_create_simple_insert(Config0) ->
SentData = #{ SentData = #{
payload => ?PAYLOAD, payload => ?PAYLOAD,
timestamp => 1668602148000, timestamp => 1668602148000,
second_ts => 1668602148000 + 100,
clientid => ClientId clientid => ClientId
}, },
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -647,9 +658,19 @@ t_auto_create_simple_insert(Config0) ->
connect_and_query(Config, "SELECT payload FROM " ++ ClientId) connect_and_query(Config, "SELECT payload FROM " ++ ClientId)
), ),
?assertMatch(
[[?PAYLOAD]],
connect_and_query(Config, "SELECT payload FROM test_" ++ ClientId)
),
?assertMatch( ?assertMatch(
[[0]], [[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId) connect_and_query(Config, "DROP TABLE " ++ ClientId)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE test_" ++ ClientId)
). ).
t_auto_create_batch_insert(Config0) -> t_auto_create_batch_insert(Config0) ->
@ -675,6 +696,7 @@ t_auto_create_batch_insert(Config0) ->
SentData = #{ SentData = #{
payload => ?PAYLOAD, payload => ?PAYLOAD,
timestamp => Ts + Idx + Offset, timestamp => Ts + Idx + Offset,
second_ts => Ts + Idx + Offset + 5000,
clientid => ClientId clientid => ClientId
}, },
Request = {send_message, SentData}, Request = {send_message, SentData},
@ -693,29 +715,28 @@ t_auto_create_batch_insert(Config0) ->
_Sleep = 50, _Sleep = 50,
_Attempts = 30, _Attempts = 30,
?assertMatch( lists:foreach(
[[Size1]], fun({Table, Size}) ->
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) ?assertMatch(
[[Size]],
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ Table)
)
end,
lists:zip(
[ClientId1, "test_" ++ ClientId1, ClientId2, "test_" ++ ClientId2],
[Size1, Size1, Size2, Size2]
)
) )
), ),
?retry( lists:foreach(
50, fun(E) ->
30, ?assertMatch(
?assertMatch( [[0]],
[[Size2]], connect_and_query(Config, "DROP TABLE " ++ E)
connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) )
) end,
), [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2]
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId1)
),
?assertMatch(
[[0]],
connect_and_query(Config, "DROP TABLE " ++ ClientId2)
). ).
to_bin(List) when is_list(List) -> to_bin(List) when is_list(List) ->

View File

@ -332,8 +332,8 @@ load_etc_config_file() ->
filter_readonly_config(Raw) -> filter_readonly_config(Raw) ->
SchemaMod = emqx_conf:schema_module(), SchemaMod = emqx_conf:schema_module(),
RawDefault = fill_defaults(Raw),
try try
RawDefault = fill_defaults(Raw),
_ = emqx_config:check_config(SchemaMod, RawDefault), _ = emqx_config:check_config(SchemaMod, RawDefault),
ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS],
{ok, maps:without(ReadOnlyKeys, Raw)} {ok, maps:without(ReadOnlyKeys, Raw)}

View File

@ -86,8 +86,7 @@ roots() ->
sc( sc(
?R_REF("node"), ?R_REF("node"),
#{ #{
translate_to => ["emqx"], translate_to => ["emqx"]
converter => fun node_converter/2
} }
)}, )},
{"cluster", {"cluster",
@ -446,9 +445,11 @@ fields("node") ->
sc( sc(
range(1024, 134217727), range(1024, 134217727),
#{ #{
mapping => "vm_args.+P", %% deprecated make sure it's disappeared in raw_conf user(HTTP API)
%% but still in vm.args via translation/1
%% ProcessLimit is always equal to MaxPort * 2 when translation/1.
deprecated => true,
desc => ?DESC(process_limit), desc => ?DESC(process_limit),
default => ?DEFAULT_MAX_PORTS * 2,
importance => ?IMPORTANCE_HIDDEN, importance => ?IMPORTANCE_HIDDEN,
'readOnly' => true 'readOnly' => true
} }
@ -1052,7 +1053,7 @@ desc("authorization") ->
desc(_) -> desc(_) ->
undefined. undefined.
translations() -> ["ekka", "kernel", "emqx", "gen_rpc", "prometheus"]. translations() -> ["ekka", "kernel", "emqx", "gen_rpc", "prometheus", "vm_args"].
translation("ekka") -> translation("ekka") ->
[{"cluster_discovery", fun tr_cluster_discovery/1}]; [{"cluster_discovery", fun tr_cluster_discovery/1}];
@ -1079,8 +1080,15 @@ translation("prometheus") ->
{"vm_system_info_collector_metrics", fun tr_vm_system_info_collector/1}, {"vm_system_info_collector_metrics", fun tr_vm_system_info_collector/1},
{"vm_memory_collector_metrics", fun tr_vm_memory_collector/1}, {"vm_memory_collector_metrics", fun tr_vm_memory_collector/1},
{"vm_msacc_collector_metrics", fun tr_vm_msacc_collector/1} {"vm_msacc_collector_metrics", fun tr_vm_msacc_collector/1}
];
translation("vm_args") ->
[
{"+P", fun tr_vm_args_process_limit/1}
]. ].
tr_vm_args_process_limit(Conf) ->
2 * conf_get("node.max_ports", Conf, ?DEFAULT_MAX_PORTS).
tr_vm_dist_collector(Conf) -> tr_vm_dist_collector(Conf) ->
metrics_enabled(conf_get("prometheus.vm_dist_collector", Conf, enabled)). metrics_enabled(conf_get("prometheus.vm_dist_collector", Conf, enabled)).
@ -1395,10 +1403,3 @@ ensure_unicode_path(Path, _) when is_list(Path) ->
Path; Path;
ensure_unicode_path(Path, _) -> ensure_unicode_path(Path, _) ->
throw({"not_string", Path}). throw({"not_string", Path}).
node_converter(#{<<"process_limit">> := _} = Conf, _Opts) ->
Conf;
node_converter(#{<<"max_ports">> := MaxPorts} = Conf, _Opts) ->
Conf#{<<"process_limit">> => MaxPorts * 2};
node_converter(Conf, _Opts) ->
Conf.

View File

@ -25,6 +25,8 @@
name = \"emqx1@127.0.0.1\" name = \"emqx1@127.0.0.1\"
cookie = \"emqxsecretcookie\" cookie = \"emqxsecretcookie\"
data_dir = \"data\" data_dir = \"data\"
max_ports = 2048
process_limit = 10240
} }
cluster { cluster {
name = emqxcl name = emqxcl
@ -42,6 +44,12 @@ array_nodes_test() ->
ConfFile = to_bin(?BASE_CONF, [Nodes, Nodes]), ConfFile = to_bin(?BASE_CONF, [Nodes, Nodes]),
{ok, Conf} = hocon:binary(ConfFile, #{format => richmap}), {ok, Conf} = hocon:binary(ConfFile, #{format => richmap}),
ConfList = hocon_tconf:generate(emqx_conf_schema, Conf), ConfList = hocon_tconf:generate(emqx_conf_schema, Conf),
VMArgs = proplists:get_value(vm_args, ConfList),
ProcLimit = proplists:get_value('+P', VMArgs),
MaxPort = proplists:get_value('+Q', VMArgs),
?assertEqual(2048, MaxPort),
?assertEqual(MaxPort * 2, ProcLimit),
ClusterDiscovery = proplists:get_value( ClusterDiscovery = proplists:get_value(
cluster_discovery, proplists:get_value(ekka, ConfList) cluster_discovery, proplists:get_value(ekka, ConfList)
), ),

View File

@ -187,6 +187,8 @@ format(WhichNode, {{Topic, _Subscriber}, Options}) ->
maps:with([qos, nl, rap, rh], Options) maps:with([qos, nl, rap, rh], Options)
). ).
get_topic(Topic, #{share := <<"$queue">> = Group}) ->
emqx_topic:join([Group, Topic]);
get_topic(Topic, #{share := Group}) -> get_topic(Topic, #{share := Group}) ->
emqx_topic:join([<<"$share">>, Group, Topic]); emqx_topic:join([<<"$share">>, Group, Topic]);
get_topic(Topic, _) -> get_topic(Topic, _) ->

View File

@ -23,7 +23,6 @@
-export([ -export([
apply_rule/3, apply_rule/3,
apply_rules/3, apply_rules/3,
clear_rule_payload/0,
inc_action_metrics/2 inc_action_metrics/2
]). ]).
@ -196,18 +195,18 @@ select_and_transform([], _Columns, Action) ->
select_and_transform(['*' | More], Columns, Action) -> select_and_transform(['*' | More], Columns, Action) ->
select_and_transform(More, Columns, maps:merge(Action, Columns)); select_and_transform(More, Columns, maps:merge(Action, Columns));
select_and_transform([{as, Field, Alias} | More], Columns, Action) -> select_and_transform([{as, Field, Alias} | More], Columns, Action) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
select_and_transform( select_and_transform(
More, More,
nested_put(Alias, Val, Columns), Columns,
nested_put(Alias, Val, Action) nested_put(Alias, Val, Action)
); );
select_and_transform([Field | More], Columns, Action) -> select_and_transform([Field | More], Columns, Action) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns), Key = alias(Field, Columns),
select_and_transform( select_and_transform(
More, More,
nested_put(Key, Val, Columns), Columns,
nested_put(Key, Val, Action) nested_put(Key, Val, Action)
). ).
@ -217,25 +216,25 @@ select_and_collect(Fields, Columns) ->
select_and_collect(Fields, Columns, {#{}, {'item', []}}). select_and_collect(Fields, Columns, {#{}, {'item', []}}).
select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) -> select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
{nested_put(Alias, Val, Action), {A, ensure_list(Val)}}; {nested_put(Alias, Val, Action), {A, ensure_list(Val)}};
select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) -> select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
select_and_collect( select_and_collect(
More, More,
nested_put(Alias, Val, Columns), nested_put(Alias, Val, Columns),
{nested_put(Alias, Val, Action), LastKV} {nested_put(Alias, Val, Action), LastKV}
); );
select_and_collect([Field], Columns, {Action, _}) -> select_and_collect([Field], Columns, {Action, _}) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns), Key = alias(Field, Columns),
{nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; {nested_put(Key, Val, Action), {'item', ensure_list(Val)}};
select_and_collect([Field | More], Columns, {Action, LastKV}) -> select_and_collect([Field | More], Columns, {Action, LastKV}) ->
Val = eval(Field, Columns), Val = eval(Field, [Action, Columns]),
Key = alias(Field, Columns), Key = alias(Field, Columns),
select_and_collect( select_and_collect(
More, More,
nested_put(Key, Val, Columns), Columns,
{nested_put(Key, Val, Action), LastKV} {nested_put(Key, Val, Action), LastKV}
). ).
@ -368,6 +367,16 @@ do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, En
inc_action_metrics(RuleId, Result), inc_action_metrics(RuleId, Result),
Result. Result.
eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) ->
case Context of
[Columns] ->
eval(Exp, Columns);
[Columns | Rest] ->
case eval(Exp, Columns) of
undefined -> eval(Exp, Rest);
Val -> Val
end
end;
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
nested_get({path, Path}, may_decode_payload(Payload)); nested_get({path, Path}, may_decode_payload(Payload));
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->

View File

@ -21,6 +21,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/emqx.hrl").
@ -583,6 +584,122 @@ t_ensure_action_removed(_) ->
%% Test cases for rule runtime %% Test cases for rule runtime
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
t_json_payload_decoding(_Config) ->
{ok, C} = emqtt:start_link(),
on_exit(fun() -> emqtt:stop(C) end),
{ok, _} = emqtt:connect(C),
Cases =
[
#{
select_fields =>
<<"payload.measurement, payload.data_type, payload.value, payload.device_id">>,
payload => emqx_utils_json:encode(#{
measurement => <<"temp">>,
data_type => <<"FLOAT">>,
value => <<"32.12">>,
device_id => <<"devid">>
}),
expected => #{
payload => #{
<<"measurement">> => <<"temp">>,
<<"data_type">> => <<"FLOAT">>,
<<"value">> => <<"32.12">>,
<<"device_id">> => <<"devid">>
}
}
},
%% "last write wins" examples
#{
select_fields => <<"payload as p, payload.f as p.answer">>,
payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}),
expected => #{
<<"p">> => #{
<<"answer">> => 42
}
}
},
#{
select_fields => <<"payload as p, payload.f as p.jsonlike.f">>,
payload => emqx_utils_json:encode(#{
jsonlike => emqx_utils_json:encode(#{a => 0}),
f => <<"huh">>
}),
%% behavior from 4.4: jsonlike gets wiped without preserving old "keys"
%% here we overwrite it since we don't explicitly decode it
expected => #{
<<"p">> => #{
<<"jsonlike">> => #{<<"f">> => <<"huh">>}
}
}
},
#{
select_fields =>
<<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>,
payload => emqx_utils_json:encode(#{
measurement => <<"temp">>,
data_type => <<"FLOAT">>,
value => <<"32.12">>,
device_id => <<"devid">>
}),
expected => #{
<<"p">> => 51
}
},
%% if selected field is already structured, new values are inserted into it
#{
select_fields =>
<<"json_decode(payload) as p, payload.a as p.z">>,
payload => emqx_utils_json:encode(#{
a => 1,
b => <<"2">>
}),
expected => #{
<<"p">> => #{
<<"a">> => 1,
<<"b">> => <<"2">>,
<<"z">> => 1
}
}
}
],
ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>,
Topic = <<"some/topic">>,
ok = snabbkaffe:start_trace(),
on_exit(fun() -> snabbkaffe:stop() end),
on_exit(fun() -> delete_rule(?TMP_RULEID) end),
lists:foreach(
fun(#{select_fields := Fs, payload := P, expected := E} = Case) ->
ct:pal("testing case ~p", [Case]),
SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>,
delete_rule(?TMP_RULEID),
{ok, _Rule} = emqx_rule_engine:create_rule(
#{
sql => SQL,
id => ?TMP_RULEID,
actions => [#{function => ActionFn}]
}
),
{_, {ok, Event}} =
?wait_async_action(
emqtt:publish(C, Topic, P, 0),
#{?snk_kind := action_response},
5_000
),
?assertMatch(
#{selected := E},
Event,
#{payload => P, fields => Fs, expected => E}
),
ok
end,
Cases
),
snabbkaffe:stop(),
ok.
t_events(_Config) -> t_events(_Config) ->
{ok, Client} = emqtt:start_link( {ok, Client} = emqtt:start_link(
[ [
@ -3065,6 +3182,14 @@ republish_action(Topic, Payload, UserProperties) ->
} }
}. }.
action_response(Selected, Envs, Args) ->
?tp(action_response, #{
selected => Selected,
envs => Envs,
args => Args
}),
ok.
make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) ->
SQL = <<"select * from \"simple/topic\"">>, SQL = <<"select * from \"simple/topic\"">>,
make_simple_rule(RuleId, SQL, Ts). make_simple_rule(RuleId, SQL, Ts).

View File

@ -71,7 +71,49 @@ t_nested_put_map(_) ->
?assertEqual( ?assertEqual(
#{k => #{<<"t">> => #{<<"a">> => v1}}}, #{k => #{<<"t">> => #{<<"a">> => v1}}},
nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}})
). ),
%% note: since we handle json-encoded binaries when evaluating the
%% rule rather than baking the decoding in `nested_put`, we test
%% this corner case that _would_ otherwise lose data to
%% demonstrate this behavior.
?assertEqual(
#{payload => #{<<"a">> => v1}},
nested_put(
?path([payload, <<"a">>]),
v1,
#{payload => emqx_utils_json:encode(#{b => <<"v2">>})}
)
),
%% We have an asymmetry in the behavior here because `nested_put'
%% currently, at each key, will use `general_find' to get the
%% current value of the eky, and that attempts JSON decoding the
%% such value... So, the cases below, `old' gets preserved
%% because it's in this direct path.
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"v2">>}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})}
)
),
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"{}">>}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => emqx_utils_json:encode(#{a => #{old => <<"{}">>}, b => <<"{}">>})}
)
),
?assertEqual(
#{payload => #{<<"a">> => #{<<"new">> => v1}}},
nested_put(
?path([payload, <<"a">>, <<"new">>]),
v1,
#{payload => <<"{}">>}
)
),
ok.
t_nested_put_index(_) -> t_nested_put_index(_) ->
?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])), ?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])),

View File

@ -0,0 +1 @@
Ensure that the range of percentage type is from 0% to 100%.

View File

@ -0,0 +1 @@
Fix a typo in the log, when EMQX received an abnormal `PUBREL` packet, the `pubrel` was mistakenly typo as `pubrec`.

View File

@ -0,0 +1 @@
Restored support for the special `$queue/` shared subscription.

View File

@ -0,0 +1,20 @@
Fix and improve support for TDEngine `insert` syntax.
1. Support inserting into multi-table in the template
For example:
`insert into table_1 values (${ts}, '${id}', '${topic}')
table_2 values (${ts}, '${id}', '${topic}')`
2. Support mixing prefixes/suffixes and placeholders in the template
For example:
`insert into table_${topic} values (${ts}, '${id}', '${topic}')`
Note: This is a breaking change. Previously the values of string type were quoted automatically, but now they must be quoted explicitly.
For example:
`insert into table values (${ts}, '${a_string}')`

View File

@ -41,7 +41,8 @@ local_topic.label:
"""Local Topic""" """Local Topic"""
record_template.desc: record_template.desc:
"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.""" """The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.<br>
NOTE: When you use `raw record` template (which means the data is not a valid JSON), you should use `read` or `subscription` in HStream to get the data."""
record_template.label: record_template.label:
"""HStream Record""" """HStream Record"""

View File

@ -19,31 +19,31 @@ name.label:
"""Connector Name""" """Connector Name"""
url.desc: url.desc:
"""HStreamDB Server URL""" """HStreamDB Server URL. Using gRPC http server address."""
url.label: url.label:
"""HStreamDB Server URL""" """HStreamDB Server URL"""
stream_name.desc: stream_name.desc:
"""HStreamDB Stream Name""" """HStreamDB Stream Name."""
stream_name.label: stream_name.label:
"""HStreamDB Stream Name""" """HStreamDB Stream Name"""
partition_key.desc: partition_key.desc:
"""HStreamDB Ordering Key""" """HStreamDB Partition Key. Placeholders supported."""
partition_key.label: partition_key.label:
"""HStreamDB Ordering Key""" """HStreamDB Partition Key"""
pool_size.desc: pool_size.desc:
"""HStreamDB Pool Size""" """HStreamDB Pool Size."""
pool_size.label: pool_size.label:
"""HStreamDB Pool Size""" """HStreamDB Pool Size"""
grpc_timeout.desc: grpc_timeout.desc:
"""HStreamDB gRPC Timeout""" """HStreamDB gRPC Timeout."""
grpc_timeout.label: grpc_timeout.label:
"""HStreamDB gRPC Timeout""" """HStreamDB gRPC Timeout"""