diff --git a/.github/workflows/run_jmeter_tests.yaml b/.github/workflows/run_jmeter_tests.yaml index 536b70ac3..04866c7a9 100644 --- a/.github/workflows/run_jmeter_tests.yaml +++ b/.github/workflows/run_jmeter_tests.yaml @@ -17,8 +17,15 @@ jobs: - uses: erlef/setup-beam@v1.15.4 with: otp-version: 25.3.2 + - name: Cache Jmeter + id: cache-jmeter + uses: actions/cache@v3 + with: + path: /tmp/apache-jmeter.tgz + key: apache-jmeter-5.4.3.tgz - name: download jmeter - timeout-minutes: 3 + if: steps.cache-jmeter.outputs.cache-hit != 'true' + timeout-minutes: 15 env: JMETER_VERSION: 5.4.3 run: | diff --git a/Makefile b/Makefile index 745ba6061..d4a3d1a6d 100644 --- a/Makefile +++ b/Makefile @@ -16,7 +16,7 @@ endif # Dashboard version # from https://github.com/emqx/emqx-dashboard5 export EMQX_DASHBOARD_VERSION ?= v1.3.2 -export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.3 +export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.4 # `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used # In make 4.4+, for backward-compatibility the value from the original environment is used. diff --git a/apps/emqx/include/emqx.hrl b/apps/emqx/include/emqx.hrl index faf3f4828..ac9d297de 100644 --- a/apps/emqx/include/emqx.hrl +++ b/apps/emqx/include/emqx.hrl @@ -33,12 +33,15 @@ -define(ERTS_MINIMUM_REQUIRED, "10.0"). %%-------------------------------------------------------------------- -%% Topics' prefix: $SYS | $share +%% Topics' prefix: $SYS | $queue | $share %%-------------------------------------------------------------------- %% System topic -define(SYSTOP, <<"$SYS/">>). +%% Queue topic +-define(QUEUE, <<"$queue/">>). + %%-------------------------------------------------------------------- %% alarms %%-------------------------------------------------------------------- diff --git a/apps/emqx/include/emqx_release.hrl b/apps/emqx/include/emqx_release.hrl index 5446975d3..2412de99e 100644 --- a/apps/emqx/include/emqx_release.hrl +++ b/apps/emqx/include/emqx_release.hrl @@ -35,7 +35,7 @@ -define(EMQX_RELEASE_CE, "5.1.1"). %% Enterprise edition --define(EMQX_RELEASE_EE, "5.1.1-alpha.1"). +-define(EMQX_RELEASE_EE, "5.1.1-alpha.2"). %% The HTTP API version -define(EMQX_API_VERSION, "5.0"). diff --git a/apps/emqx/src/emqx_channel.erl b/apps/emqx/src/emqx_channel.erl index d879e5a2d..dfc17562c 100644 --- a/apps/emqx/src/emqx_channel.erl +++ b/apps/emqx/src/emqx_channel.erl @@ -455,7 +455,7 @@ handle_in( NChannel = Channel#channel{session = NSession}, handle_out(pubcomp, {PacketId, ?RC_SUCCESS}, NChannel); {error, RC = ?RC_PACKET_IDENTIFIER_NOT_FOUND} -> - ?SLOG(warning, #{msg => "pubrec_packetId_not_found", packetId => PacketId}), + ?SLOG(warning, #{msg => "pubrel_packetId_not_found", packetId => PacketId}), ok = emqx_metrics:inc('packets.pubrel.missed'), handle_out(pubcomp, {PacketId, RC}, Channel) end; diff --git a/apps/emqx/src/emqx_schema.erl b/apps/emqx/src/emqx_schema.erl index fb413405c..01c01c2be 100644 --- a/apps/emqx/src/emqx_schema.erl +++ b/apps/emqx/src/emqx_schema.erl @@ -2504,7 +2504,11 @@ to_integer(Str) -> end. to_percent(Str) -> - {ok, hocon_postprocess:percent(Str)}. + Percent = hocon_postprocess:percent(Str), + case is_number(Percent) andalso Percent >= 0.0 andalso Percent =< 1.0 of + true -> {ok, Percent}; + false -> {error, Str} + end. to_comma_separated_list(Str) -> {ok, string:tokens(Str, ", ")}. @@ -2732,15 +2736,17 @@ check_cpu_watermark(Conf) -> check_watermark("sysmon.os.cpu_low_watermark", "sysmon.os.cpu_high_watermark", Conf). check_watermark(LowKey, HighKey, Conf) -> - case hocon_maps:get(LowKey, Conf) of - undefined -> + case to_percent(hocon_maps:get(LowKey, Conf)) of + {error, undefined} -> true; - Low -> - High = hocon_maps:get(HighKey, Conf), - case Low < High of - true -> true; - false -> {bad_watermark, #{LowKey => Low, HighKey => High}} - end + {ok, Low} -> + case to_percent(hocon_maps:get(HighKey, Conf)) of + {ok, High} when High > Low -> true; + {ok, High} -> {bad_watermark, #{LowKey => Low, HighKey => High}}; + {error, HighVal} -> {bad_watermark, #{HighKey => HighVal}} + end; + {error, Low} -> + {bad_watermark, #{LowKey => Low}} end. str(A) when is_atom(A) -> diff --git a/apps/emqx/src/emqx_topic.erl b/apps/emqx/src/emqx_topic.erl index c1515e14b..6d232c68d 100644 --- a/apps/emqx/src/emqx_topic.erl +++ b/apps/emqx/src/emqx_topic.erl @@ -244,8 +244,12 @@ parse({TopicFilter, Options}) when is_binary(TopicFilter) -> parse(TopicFilter, Options). -spec parse(topic(), map()) -> {topic(), map()}. +parse(TopicFilter = <<"$queue/", _/binary>>, #{share := _Group}) -> + error({invalid_topic_filter, TopicFilter}); parse(TopicFilter = <<"$share/", _/binary>>, #{share := _Group}) -> error({invalid_topic_filter, TopicFilter}); +parse(<<"$queue/", TopicFilter/binary>>, Options) -> + parse(TopicFilter, Options#{share => <<"$queue">>}); parse(TopicFilter = <<"$share/", Rest/binary>>, Options) -> case binary:split(Rest, <<"/">>) of [_Any] -> diff --git a/apps/emqx/test/emqx_proper_types.erl b/apps/emqx/test/emqx_proper_types.erl index 6d1ced486..0e9d3032c 100644 --- a/apps/emqx/test/emqx_proper_types.erl +++ b/apps/emqx/test/emqx_proper_types.erl @@ -444,7 +444,7 @@ systopic_mon() -> sharetopic() -> ?LET( {Type, Grp, T}, - {oneof([<<"$share">>]), list(latin_char()), normal_topic()}, + {oneof([<<"$queue">>, <<"$share">>]), list(latin_char()), normal_topic()}, <> ). diff --git a/apps/emqx/test/emqx_shared_sub_SUITE.erl b/apps/emqx/test/emqx_shared_sub_SUITE.erl index e280f4fe5..4726f1111 100644 --- a/apps/emqx/test/emqx_shared_sub_SUITE.erl +++ b/apps/emqx/test/emqx_shared_sub_SUITE.erl @@ -20,8 +20,10 @@ -compile(nowarn_export_all). -include_lib("emqx/include/emqx.hrl"). +-include_lib("emqx/include/emqx_mqtt.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -define(SUITE, ?MODULE). @@ -986,6 +988,112 @@ t_session_kicked(Config) when is_list(Config) -> ?assertEqual([], collect_msgs(0)), ok. +%% FIXME: currently doesn't work +%% t_different_groups_same_topic({init, Config}) -> +%% TestName = atom_to_binary(?FUNCTION_NAME), +%% ClientId = <>, +%% {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), +%% {ok, _} = emqtt:connect(C), +%% [{client, C}, {clientid, ClientId} | Config]; +%% t_different_groups_same_topic({'end', Config}) -> +%% C = ?config(client, Config), +%% emqtt:stop(C), +%% ok; +%% t_different_groups_same_topic(Config) when is_list(Config) -> +%% C = ?config(client, Config), +%% ClientId = ?config(clientid, Config), +%% %% Subscribe and unsubscribe to both $queue and $shared topics +%% Topic = <<"t/1">>, +%% SharedTopic0 = <<"$share/aa/", Topic/binary>>, +%% SharedTopic1 = <<"$share/bb/", Topic/binary>>, +%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic0, 2}), +%% {ok, _, [2]} = emqtt:subscribe(C, {SharedTopic1, 2}), + +%% Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), +%% emqx:publish(Message0), +%% ?assertMatch([ {publish, #{payload := <<"hi">>}} +%% , {publish, #{payload := <<"hi">>}} +%% ], collect_msgs(5_000), #{routes => ets:tab2list(emqx_route)}), + +%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic0), +%% {ok, _, [0]} = emqtt:unsubscribe(C, SharedTopic1), + +%% ok. + +t_queue_subscription({init, Config}) -> + TestName = atom_to_binary(?FUNCTION_NAME), + ClientId = <>, + + {ok, C} = emqtt:start_link([{clientid, ClientId}, {proto_ver, v5}]), + {ok, _} = emqtt:connect(C), + + [{client, C}, {clientid, ClientId} | Config]; +t_queue_subscription({'end', Config}) -> + C = ?config(client, Config), + emqtt:stop(C), + ok; +t_queue_subscription(Config) when is_list(Config) -> + C = ?config(client, Config), + ClientId = ?config(clientid, Config), + %% Subscribe and unsubscribe to both $queue and $shared topics + Topic = <<"t/1">>, + QueueTopic = <<"$queue/", Topic/binary>>, + SharedTopic = <<"$share/aa/", Topic/binary>>, + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {QueueTopic, 2}), + {ok, _, [?RC_GRANTED_QOS_2]} = emqtt:subscribe(C, {SharedTopic, 2}), + + %% FIXME: we should actually see 2 routes, one for each group + %% ($queue and aa), but currently the latest subscription + %% overwrites the existing one. + ?retry( + _Sleep0 = 100, + _Attempts0 = 50, + begin + ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), + %% FIXME: should ensure we have 2 subscriptions + true = emqx_router:has_routes(Topic) + end + ), + + %% now publish to the underlying topic + Message0 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hi">>), + emqx:publish(Message0), + ?assertMatch( + [ + {publish, #{payload := <<"hi">>}} + %% FIXME: should receive one message from each group + %% , {publish, #{payload := <<"hi">>}} + ], + collect_msgs(5_000) + ), + + {ok, _, [?RC_SUCCESS]} = emqtt:unsubscribe(C, QueueTopic), + %% FIXME: return code should be success instead of 17 ("no_subscription_existed") + {ok, _, [?RC_NO_SUBSCRIPTION_EXISTED]} = emqtt:unsubscribe(C, SharedTopic), + + %% FIXME: this should eventually be true, but currently we leak + %% the previous group subscription... + %% ?retry( + %% _Sleep0 = 100, + %% _Attempts0 = 50, + %% begin + %% ct:pal("routes: ~p", [ets:tab2list(emqx_route)]), + %% false = emqx_router:has_routes(Topic) + %% end + %% ), + ct:sleep(500), + + Message1 = emqx_message:make(ClientId, _QoS = 2, Topic, <<"hello">>), + emqx:publish(Message1), + %% FIXME: we should *not* receive any messages... + %% ?assertEqual([], collect_msgs(1_000), #{routes => ets:tab2list(emqx_route)}), + %% This is from the leaked group... + ?assertMatch([{publish, #{topic := Topic}}], collect_msgs(1_000), #{ + routes => ets:tab2list(emqx_route) + }), + + ok. + %%-------------------------------------------------------------------- %% help functions %%-------------------------------------------------------------------- diff --git a/apps/emqx/test/emqx_topic_SUITE.erl b/apps/emqx/test/emqx_topic_SUITE.erl index 521efe751..c49c93fb2 100644 --- a/apps/emqx/test/emqx_topic_SUITE.erl +++ b/apps/emqx/test/emqx_topic_SUITE.erl @@ -211,6 +211,10 @@ t_systop(_) -> ?assertEqual(SysTop2, systop(<<"abc">>)). t_feed_var(_) -> + ?assertEqual( + <<"$queue/client/clientId">>, + feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>) + ), ?assertEqual( <<"username/test/client/x">>, feed_var( @@ -232,6 +236,10 @@ long_topic() -> iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]). t_parse(_) -> + ?assertError( + {invalid_topic_filter, <<"$queue/t">>}, + parse(<<"$queue/t">>, #{share => <<"g">>}) + ), ?assertError( {invalid_topic_filter, <<"$share/g/t">>}, parse(<<"$share/g/t">>, #{share => <<"g">>}) @@ -246,9 +254,11 @@ t_parse(_) -> ), ?assertEqual({<<"a/b/+/#">>, #{}}, parse(<<"a/b/+/#">>)), ?assertEqual({<<"a/b/+/#">>, #{qos => 1}}, parse({<<"a/b/+/#">>, #{qos => 1}})), + ?assertEqual({<<"topic">>, #{share => <<"$queue">>}}, parse(<<"$queue/topic">>)), ?assertEqual({<<"topic">>, #{share => <<"group">>}}, parse(<<"$share/group/topic">>)), %% The '$local' and '$fastlane' topics have been deprecated. ?assertEqual({<<"$local/topic">>, #{}}, parse(<<"$local/topic">>)), + ?assertEqual({<<"$local/$queue/topic">>, #{}}, parse(<<"$local/$queue/topic">>)), ?assertEqual({<<"$local/$share/group/a/b/c">>, #{}}, parse(<<"$local/$share/group/a/b/c">>)), ?assertEqual({<<"$fastlane/topic">>, #{}}, parse(<<"$fastlane/topic">>)). diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index 7056e6059..e1c3ee987 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -988,15 +988,10 @@ call_operation(NodeOrAll, OperFunc, Args = [_Nodes, BridgeType, BridgeName]) -> %% still on an older bpapi version that doesn't support it. maybe_try_restart(NodeOrAll, OperFunc, Args); {error, timeout} -> - ?SERVICE_UNAVAILABLE(<<"Request timeout">>); + ?BAD_REQUEST(<<"Request timeout">>); {error, {start_pool_failed, Name, Reason}} -> Msg = bin(io_lib:format("Failed to start ~p pool for reason ~p", [Name, Reason])), - case Reason of - nxdomain -> - ?BAD_REQUEST(Msg); - _ -> - ?SERVICE_UNAVAILABLE(Msg) - end; + ?BAD_REQUEST(Msg); {error, not_found} -> BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), ?SLOG(warning, #{ diff --git a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl index bc27afda2..d8e697987 100644 --- a/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl +++ b/apps/emqx_bridge/test/emqx_bridge_api_SUITE.erl @@ -834,7 +834,8 @@ do_start_stop_bridges(Type, Config) -> ), BadBridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_MQTT, BadName), ?assertMatch( - {ok, SC, _} when SC == 500 orelse SC == 503, + %% request from product: return 400 on such errors + {ok, SC, _} when SC == 500 orelse SC == 400, request(post, {operation, Type, start, BadBridgeID}, Config) ), ok = gen_tcp:close(Sock), diff --git a/apps/emqx_bridge/test/emqx_bridge_testlib.erl b/apps/emqx_bridge/test/emqx_bridge_testlib.erl index bd3de3561..651fd24ff 100644 --- a/apps/emqx_bridge/test/emqx_bridge_testlib.erl +++ b/apps/emqx_bridge/test/emqx_bridge_testlib.erl @@ -10,6 +10,8 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-import(emqx_common_test_helpers, [on_exit/1]). + %% ct setup helpers init_per_suite(Config, Apps) -> @@ -211,19 +213,27 @@ probe_bridge_api(BridgeType, BridgeName, BridgeConfig) -> Res. create_rule_and_action_http(BridgeType, RuleTopic, Config) -> + create_rule_and_action_http(BridgeType, RuleTopic, Config, _Opts = #{}). + +create_rule_and_action_http(BridgeType, RuleTopic, Config, Opts) -> BridgeName = ?config(bridge_name, Config), BridgeId = emqx_bridge_resource:bridge_id(BridgeType, BridgeName), + SQL = maps:get(sql, Opts, <<"SELECT * FROM \"", RuleTopic/binary, "\"">>), Params = #{ enable => true, - sql => <<"SELECT * FROM \"", RuleTopic/binary, "\"">>, + sql => SQL, actions => [BridgeId] }, Path = emqx_mgmt_api_test_util:api_path(["rules"]), AuthHeader = emqx_mgmt_api_test_util:auth_header_(), ct:pal("rule action params: ~p", [Params]), case emqx_mgmt_api_test_util:request_api(post, Path, "", AuthHeader, Params) of - {ok, Res} -> {ok, emqx_utils_json:decode(Res, [return_maps])}; - Error -> Error + {ok, Res0} -> + Res = #{<<"id">> := RuleId} = emqx_utils_json:decode(Res0, [return_maps]), + on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), + {ok, Res}; + Error -> + Error end. %%------------------------------------------------------------------------------ diff --git a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl index 16092f262..1e2ebbfb1 100644 --- a/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl +++ b/apps/emqx_bridge_hstreamdb/src/emqx_bridge_hstreamdb_connector.erl @@ -119,10 +119,13 @@ roots() -> fields(config) -> [ - {url, mk(binary(), #{required => true, desc => ?DESC("url")})}, + {url, + mk(binary(), #{ + required => true, desc => ?DESC("url"), default => <<"http://127.0.0.1:6570">> + })}, {stream, mk(binary(), #{required => true, desc => ?DESC("stream_name")})}, {partition_key, mk(binary(), #{required => false, desc => ?DESC("partition_key")})}, - {pool_size, mk(pos_integer(), #{required => true, desc => ?DESC("pool_size")})}, + {pool_size, fun emqx_connector_schema_lib:pool_size/1}, {grpc_timeout, fun grpc_timeout/1} ] ++ emqx_connector_schema_lib:ssl_fields(). diff --git a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl index dfd5fd07c..d29e38833 100644 --- a/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl +++ b/apps/emqx_bridge_iotdb/test/emqx_bridge_iotdb_impl_SUITE.erl @@ -379,6 +379,41 @@ t_sync_device_id_missing(Config) -> iotdb_bridge_on_query ). +t_extract_device_id_from_rule_engine_message(Config) -> + BridgeType = ?config(bridge_type, Config), + RuleTopic = <<"t/iotdb">>, + DeviceId = iotdb_device(Config), + Payload = make_iotdb_payload(DeviceId, "temp", "INT32", "12"), + Message = emqx_message:make(RuleTopic, emqx_utils_json:encode(Payload)), + ?check_trace( + begin + {ok, _} = emqx_bridge_testlib:create_bridge(Config), + SQL = << + "SELECT\n" + " payload.measurement, payload.data_type, payload.value, payload.device_id\n" + "FROM\n" + " \"", + RuleTopic/binary, + "\"" + >>, + Opts = #{sql => SQL}, + {ok, _} = emqx_bridge_testlib:create_rule_and_action_http( + BridgeType, RuleTopic, Config, Opts + ), + emqx:publish(Message), + ?block_until(handle_async_reply, 5_000), + ok + end, + fun(Trace) -> + ?assertMatch( + [#{action := ack, result := {ok, 200, _, _}}], + ?of_kind(handle_async_reply, Trace) + ), + ok + end + ), + ok. + t_sync_invalid_data(Config) -> emqx_bridge_testlib:t_sync_query( Config, diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl index 0b618487d..da170e943 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine.erl @@ -23,7 +23,7 @@ -define(DEFAULT_SQL, << "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) " - "values (${ts}, ${id}, ${topic}, ${qos}, ${payload}, ${timestamp})" + "values (${ts}, '${id}', '${topic}', ${qos}, '${payload}', ${timestamp})" >>). %% ------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl index 826a03baa..47166f4e1 100644 --- a/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl +++ b/apps/emqx_bridge_tdengine/src/emqx_bridge_tdengine_connector.erl @@ -124,7 +124,7 @@ on_query(InstanceId, {query, SQL}, State) -> on_query(InstanceId, {Key, Data}, #{insert_tokens := InsertTksMap} = State) -> case maps:find(Key, InsertTksMap) of {ok, Tokens} when is_map(Data) -> - SQL = emqx_placeholder:proc_sql_param_str(Tokens, Data), + SQL = emqx_placeholder:proc_tmpl(Tokens, Data), do_query(InstanceId, SQL, State); _ -> {error, {unrecoverable_error, invalid_request}} @@ -209,31 +209,16 @@ execute(Conn, Query, Opts) -> tdengine:insert(Conn, Query, Opts). do_batch_insert(Conn, Tokens, BatchReqs, Opts) -> - Queries = aggregate_query(Tokens, BatchReqs), - SQL = maps:fold( - fun(InsertPart, Values, Acc) -> - lists:foldl( - fun(ValuePart, IAcc) -> - <> - end, - <>, - Values - ) - end, - <<"INSERT INTO">>, - Queries - ), + SQL = aggregate_query(Tokens, BatchReqs, <<"INSERT INTO">>), execute(Conn, SQL, Opts). -aggregate_query({InsertPartTks, ParamsPartTks}, BatchReqs) -> +aggregate_query(BatchTks, BatchReqs, Acc) -> lists:foldl( - fun({_, Data}, Acc) -> - InsertPart = emqx_placeholder:proc_sql_param_str(InsertPartTks, Data), - ParamsPart = emqx_placeholder:proc_sql_param_str(ParamsPartTks, Data), - Values = maps:get(InsertPart, Acc, []), - maps:put(InsertPart, [ParamsPart | Values], Acc) + fun({_, Data}, InAcc) -> + InsertPart = emqx_placeholder:proc_tmpl(BatchTks, Data), + <> end, - #{}, + Acc, BatchReqs ). @@ -260,13 +245,12 @@ parse_batch_prepare_sql([{Key, H} | T], InsertTksMap, BatchTksMap) -> InsertTks = emqx_placeholder:preproc_tmpl(H), H1 = string:trim(H, trailing, ";"), case split_insert_sql(H1) of - [_InsertStr, InsertPart, _ValuesStr, ParamsPart] -> - InsertPartTks = emqx_placeholder:preproc_tmpl(InsertPart), - ParamsPartTks = emqx_placeholder:preproc_tmpl(ParamsPart), + [_InsertPart, BatchDesc] -> + BatchTks = emqx_placeholder:preproc_tmpl(BatchDesc), parse_batch_prepare_sql( T, InsertTksMap#{Key => InsertTks}, - BatchTksMap#{Key => {InsertPartTks, ParamsPartTks}} + BatchTksMap#{Key => BatchTks} ); Result -> ?SLOG(error, #{msg => "split sql failed", sql => H, result => Result}), @@ -299,7 +283,7 @@ split_insert_sql(SQL0) -> {true, E1} end end, - re:split(SQL, "(?i)(insert into)|(?i)(values)") + re:split(SQL, "(?i)(insert into)") ). formalize_sql(Input) -> diff --git a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl index 54744d806..92ad3a611 100644 --- a/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl +++ b/apps/emqx_bridge_tdengine/test/emqx_bridge_tdengine_SUITE.erl @@ -13,7 +13,8 @@ % SQL definitions -define(SQL_BRIDGE, - "insert into mqtt.t_mqtt_msg(ts, payload) values (${timestamp}, ${payload})" + "insert into t_mqtt_msg(ts, payload) values (${timestamp}, '${payload}')" + "t_mqtt_msg(ts, payload) values (${second_ts}, '${payload}')" ). -define(SQL_CREATE_DATABASE, "CREATE DATABASE IF NOT EXISTS mqtt; USE mqtt;"). @@ -29,7 +30,8 @@ -define(SQL_SELECT, "SELECT payload FROM t_mqtt_msg"). -define(AUTO_CREATE_BRIDGE, - "insert into ${clientid} USING s_tab TAGS (${clientid}) values (${timestamp}, ${payload})" + "insert into ${clientid} USING s_tab TAGS ('${clientid}') values (${timestamp}, '${payload}')" + "test_${clientid} USING s_tab TAGS ('${clientid}') values (${second_ts}, '${payload}')" ). -define(SQL_CREATE_STABLE, @@ -301,7 +303,7 @@ connect_and_clear_table(Config) -> connect_and_get_payload(Config) -> ?WITH_CON( - {ok, #{<<"code">> := 0, <<"data">> := [[Result]]}} = directly_query(Con, ?SQL_SELECT) + {ok, #{<<"code">> := 0, <<"data">> := Result}} = directly_query(Con, ?SQL_SELECT) ), Result. @@ -329,7 +331,7 @@ t_setup_via_config_and_publish(Config) -> {ok, _}, create_bridge(Config) ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, ?check_trace( begin {_, {ok, #{result := Result}}} = @@ -342,7 +344,7 @@ t_setup_via_config_and_publish(Config) -> {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result ), ?assertMatch( - ?PAYLOAD, + [[?PAYLOAD], [?PAYLOAD]], connect_and_get_payload(Config) ), ok @@ -368,7 +370,8 @@ t_setup_via_http_api_and_publish(Config) -> {ok, _}, create_bridge_http(TDengineConfig) ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, ?check_trace( begin Request = {send_message, SentData}, @@ -386,7 +389,7 @@ t_setup_via_http_api_and_publish(Config) -> {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0 ), ?assertMatch( - ?PAYLOAD, + [[?PAYLOAD], [?PAYLOAD]], connect_and_get_payload(Config) ), ok @@ -426,7 +429,7 @@ t_write_failure(Config) -> ProxyPort = ?config(proxy_port, Config), ProxyHost = ?config(proxy_host, Config), {ok, _} = create_bridge(Config), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> {_, {ok, #{result := Result}}} = ?wait_async_action( @@ -461,7 +464,7 @@ t_write_timeout(Config) -> } } ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, %% FIXME: TDengine connector hangs indefinetily during %% `call_query' while the connection is unresponsive. Should add %% a timeout to `APPLY_RESOURCE' in buffer worker?? @@ -486,7 +489,7 @@ t_simple_sql_query(Config) -> {ok, _}, create_bridge(Config) ), - Request = {query, <<"SELECT count(1) AS T">>}, + Request = {query, <<"SELECT 1 AS T">>}, {_, {ok, #{result := Result}}} = ?wait_async_action( query_resource(Config, Request), @@ -537,37 +540,41 @@ t_bad_sql_parameter(Config) -> ?assertMatch({error, {unrecoverable_error, invalid_request}}, Result), ok. -t_nasty_sql_string(Config) -> - ?assertMatch( - {ok, _}, - create_bridge(Config) - ), - % NOTE - % Column `payload` has BINARY type, so we would certainly like to test it - % with `lists:seq(1, 127)`, but: - % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's - % parser[1] has no escaping sequence for it so a zero byte probably confuses - % interpreter somewhere down the line. - % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for - % some reason. - % - % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 - Payload = list_to_binary(lists:seq(1, 127)), - Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, - {_, {ok, #{result := Result}}} = - ?wait_async_action( - send_message(Config, Message), - #{?snk_kind := buffer_worker_flush_ack}, - 2_000 - ), - ?assertMatch( - {ok, #{<<"code">> := 0, <<"rows">> := 1}}, - Result - ), - ?assertEqual( - Payload, - connect_and_get_payload(Config) - ). +%% TODO +%% For supporting to generate a subtable name by mixing prefixes/suffixes with placeholders, +%% the SQL quote(escape) is removed now, +%% we should introduce a new syntax for placeholders to allow some vars to keep unquote. +%% t_nasty_sql_string(Config) -> +%% ?assertMatch( +%% {ok, _}, +%% create_bridge(Config) +%% ), +%% % NOTE +%% % Column `payload` has BINARY type, so we would certainly like to test it +%% % with `lists:seq(1, 127)`, but: +%% % 1. There's no way to insert zero byte in an SQL string, seems that TDengine's +%% % parser[1] has no escaping sequence for it so a zero byte probably confuses +%% % interpreter somewhere down the line. +%% % 2. Bytes > 127 come back as U+FFFDs (i.e. replacement characters) in UTF-8 for +%% % some reason. +%% % +%% % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 +%% Payload = list_to_binary(lists:seq(1, 127)), +%% Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, +%% {_, {ok, #{result := Result}}} = +%% ?wait_async_action( +%% send_message(Config, Message), +%% #{?snk_kind := buffer_worker_flush_ack}, +%% 2_000 +%% ), +%% ?assertMatch( +%% {ok, #{<<"code">> := 0, <<"rows">> := 1}}, +%% Result +%% ), +%% ?assertEqual( +%% Payload, +%% connect_and_get_payload(Config) +%% ). t_simple_insert(Config) -> connect_and_clear_table(Config), @@ -576,7 +583,7 @@ t_simple_insert(Config) -> create_bridge(Config) ), - SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, + SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000, second_ts => 1668602148010}, Request = {send_message, SentData}, {_, {ok, #{result := _Result}}} = ?wait_async_action( @@ -585,7 +592,7 @@ t_simple_insert(Config) -> 2_000 ), ?assertMatch( - ?PAYLOAD, + [[?PAYLOAD], [?PAYLOAD]], connect_and_get_payload(Config) ). @@ -602,7 +609,9 @@ t_batch_insert(Config) -> ?wait_async_action( lists:foreach( fun(Idx) -> - SentData = #{payload => ?PAYLOAD, timestamp => Ts + Idx}, + SentData = #{ + payload => ?PAYLOAD, timestamp => Ts + Idx, second_ts => Ts + Idx + 5000 + }, Request = {send_message, SentData}, query_resource(Config, Request) end, @@ -613,11 +622,12 @@ t_batch_insert(Config) -> 2_000 ), + DoubleSize = Size * 2, ?retry( _Sleep = 50, _Attempts = 30, ?assertMatch( - [[Size]], + [[DoubleSize]], connect_and_query(Config, "SELECT COUNT(1) FROM t_mqtt_msg") ) ). @@ -633,6 +643,7 @@ t_auto_create_simple_insert(Config0) -> SentData = #{ payload => ?PAYLOAD, timestamp => 1668602148000, + second_ts => 1668602148000 + 100, clientid => ClientId }, Request = {send_message, SentData}, @@ -647,9 +658,19 @@ t_auto_create_simple_insert(Config0) -> connect_and_query(Config, "SELECT payload FROM " ++ ClientId) ), + ?assertMatch( + [[?PAYLOAD]], + connect_and_query(Config, "SELECT payload FROM test_" ++ ClientId) + ), + ?assertMatch( [[0]], connect_and_query(Config, "DROP TABLE " ++ ClientId) + ), + + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE test_" ++ ClientId) ). t_auto_create_batch_insert(Config0) -> @@ -675,6 +696,7 @@ t_auto_create_batch_insert(Config0) -> SentData = #{ payload => ?PAYLOAD, timestamp => Ts + Idx + Offset, + second_ts => Ts + Idx + Offset + 5000, clientid => ClientId }, Request = {send_message, SentData}, @@ -693,29 +715,28 @@ t_auto_create_batch_insert(Config0) -> _Sleep = 50, _Attempts = 30, - ?assertMatch( - [[Size1]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId1) + lists:foreach( + fun({Table, Size}) -> + ?assertMatch( + [[Size]], + connect_and_query(Config, "SELECT COUNT(1) FROM " ++ Table) + ) + end, + lists:zip( + [ClientId1, "test_" ++ ClientId1, ClientId2, "test_" ++ ClientId2], + [Size1, Size1, Size2, Size2] + ) ) ), - ?retry( - 50, - 30, - ?assertMatch( - [[Size2]], - connect_and_query(Config, "SELECT COUNT(1) FROM " ++ ClientId2) - ) - ), - - ?assertMatch( - [[0]], - connect_and_query(Config, "DROP TABLE " ++ ClientId1) - ), - - ?assertMatch( - [[0]], - connect_and_query(Config, "DROP TABLE " ++ ClientId2) + lists:foreach( + fun(E) -> + ?assertMatch( + [[0]], + connect_and_query(Config, "DROP TABLE " ++ E) + ) + end, + [ClientId1, ClientId2, "test_" ++ ClientId1, "test_" ++ ClientId2] ). to_bin(List) when is_list(List) -> diff --git a/apps/emqx_conf/src/emqx_conf_cli.erl b/apps/emqx_conf/src/emqx_conf_cli.erl index 05eb60531..b0a1a414d 100644 --- a/apps/emqx_conf/src/emqx_conf_cli.erl +++ b/apps/emqx_conf/src/emqx_conf_cli.erl @@ -332,8 +332,8 @@ load_etc_config_file() -> filter_readonly_config(Raw) -> SchemaMod = emqx_conf:schema_module(), - RawDefault = fill_defaults(Raw), try + RawDefault = fill_defaults(Raw), _ = emqx_config:check_config(SchemaMod, RawDefault), ReadOnlyKeys = [atom_to_binary(K) || K <- ?READONLY_KEYS], {ok, maps:without(ReadOnlyKeys, Raw)} diff --git a/apps/emqx_conf/src/emqx_conf_schema.erl b/apps/emqx_conf/src/emqx_conf_schema.erl index 816e2f454..22c8c3c26 100644 --- a/apps/emqx_conf/src/emqx_conf_schema.erl +++ b/apps/emqx_conf/src/emqx_conf_schema.erl @@ -86,8 +86,7 @@ roots() -> sc( ?R_REF("node"), #{ - translate_to => ["emqx"], - converter => fun node_converter/2 + translate_to => ["emqx"] } )}, {"cluster", @@ -446,9 +445,11 @@ fields("node") -> sc( range(1024, 134217727), #{ - mapping => "vm_args.+P", + %% deprecated make sure it's disappeared in raw_conf user(HTTP API) + %% but still in vm.args via translation/1 + %% ProcessLimit is always equal to MaxPort * 2 when translation/1. + deprecated => true, desc => ?DESC(process_limit), - default => ?DEFAULT_MAX_PORTS * 2, importance => ?IMPORTANCE_HIDDEN, 'readOnly' => true } @@ -1052,7 +1053,7 @@ desc("authorization") -> desc(_) -> undefined. -translations() -> ["ekka", "kernel", "emqx", "gen_rpc", "prometheus"]. +translations() -> ["ekka", "kernel", "emqx", "gen_rpc", "prometheus", "vm_args"]. translation("ekka") -> [{"cluster_discovery", fun tr_cluster_discovery/1}]; @@ -1079,8 +1080,15 @@ translation("prometheus") -> {"vm_system_info_collector_metrics", fun tr_vm_system_info_collector/1}, {"vm_memory_collector_metrics", fun tr_vm_memory_collector/1}, {"vm_msacc_collector_metrics", fun tr_vm_msacc_collector/1} + ]; +translation("vm_args") -> + [ + {"+P", fun tr_vm_args_process_limit/1} ]. +tr_vm_args_process_limit(Conf) -> + 2 * conf_get("node.max_ports", Conf, ?DEFAULT_MAX_PORTS). + tr_vm_dist_collector(Conf) -> metrics_enabled(conf_get("prometheus.vm_dist_collector", Conf, enabled)). @@ -1395,10 +1403,3 @@ ensure_unicode_path(Path, _) when is_list(Path) -> Path; ensure_unicode_path(Path, _) -> throw({"not_string", Path}). - -node_converter(#{<<"process_limit">> := _} = Conf, _Opts) -> - Conf; -node_converter(#{<<"max_ports">> := MaxPorts} = Conf, _Opts) -> - Conf#{<<"process_limit">> => MaxPorts * 2}; -node_converter(Conf, _Opts) -> - Conf. diff --git a/apps/emqx_conf/test/emqx_conf_schema_tests.erl b/apps/emqx_conf/test/emqx_conf_schema_tests.erl index d5c864fab..ae74897bd 100644 --- a/apps/emqx_conf/test/emqx_conf_schema_tests.erl +++ b/apps/emqx_conf/test/emqx_conf_schema_tests.erl @@ -25,6 +25,8 @@ name = \"emqx1@127.0.0.1\" cookie = \"emqxsecretcookie\" data_dir = \"data\" + max_ports = 2048 + process_limit = 10240 } cluster { name = emqxcl @@ -42,6 +44,12 @@ array_nodes_test() -> ConfFile = to_bin(?BASE_CONF, [Nodes, Nodes]), {ok, Conf} = hocon:binary(ConfFile, #{format => richmap}), ConfList = hocon_tconf:generate(emqx_conf_schema, Conf), + VMArgs = proplists:get_value(vm_args, ConfList), + ProcLimit = proplists:get_value('+P', VMArgs), + MaxPort = proplists:get_value('+Q', VMArgs), + ?assertEqual(2048, MaxPort), + ?assertEqual(MaxPort * 2, ProcLimit), + ClusterDiscovery = proplists:get_value( cluster_discovery, proplists:get_value(ekka, ConfList) ), diff --git a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl index dc35f5b97..e6e8bb475 100644 --- a/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl +++ b/apps/emqx_management/src/emqx_mgmt_api_subscriptions.erl @@ -187,6 +187,8 @@ format(WhichNode, {{Topic, _Subscriber}, Options}) -> maps:with([qos, nl, rap, rh], Options) ). +get_topic(Topic, #{share := <<"$queue">> = Group}) -> + emqx_topic:join([Group, Topic]); get_topic(Topic, #{share := Group}) -> emqx_topic:join([<<"$share">>, Group, Topic]); get_topic(Topic, _) -> diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index f83aa2920..de1e92a3f 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -23,7 +23,6 @@ -export([ apply_rule/3, apply_rules/3, - clear_rule_payload/0, inc_action_metrics/2 ]). @@ -196,18 +195,18 @@ select_and_transform([], _Columns, Action) -> select_and_transform(['*' | More], Columns, Action) -> select_and_transform(More, Columns, maps:merge(Action, Columns)); select_and_transform([{as, Field, Alias} | More], Columns, Action) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), select_and_transform( More, - nested_put(Alias, Val, Columns), + Columns, nested_put(Alias, Val, Action) ); select_and_transform([Field | More], Columns, Action) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), select_and_transform( More, - nested_put(Key, Val, Columns), + Columns, nested_put(Key, Val, Action) ). @@ -217,25 +216,25 @@ select_and_collect(Fields, Columns) -> select_and_collect(Fields, Columns, {#{}, {'item', []}}). select_and_collect([{as, Field, {_, A} = Alias}], Columns, {Action, _}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), {nested_put(Alias, Val, Action), {A, ensure_list(Val)}}; select_and_collect([{as, Field, Alias} | More], Columns, {Action, LastKV}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), select_and_collect( More, nested_put(Alias, Val, Columns), {nested_put(Alias, Val, Action), LastKV} ); select_and_collect([Field], Columns, {Action, _}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), {nested_put(Key, Val, Action), {'item', ensure_list(Val)}}; select_and_collect([Field | More], Columns, {Action, LastKV}) -> - Val = eval(Field, Columns), + Val = eval(Field, [Action, Columns]), Key = alias(Field, Columns), select_and_collect( More, - nested_put(Key, Val, Columns), + Columns, {nested_put(Key, Val, Action), LastKV} ). @@ -368,6 +367,16 @@ do_handle_action(RuleId, #{mod := Mod, func := Func, args := Args}, Selected, En inc_action_metrics(RuleId, Result), Result. +eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op == var) -> + case Context of + [Columns] -> + eval(Exp, Columns); + [Columns | Rest] -> + case eval(Exp, Columns) of + undefined -> eval(Exp, Rest); + Val -> Val + end + end; eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) -> nested_get({path, Path}, may_decode_payload(Payload)); eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) -> diff --git a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl index 822fac067..c8bebab99 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_engine_SUITE.erl @@ -21,6 +21,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("emqx/include/emqx.hrl"). @@ -583,6 +584,122 @@ t_ensure_action_removed(_) -> %% Test cases for rule runtime %%------------------------------------------------------------------------------ +t_json_payload_decoding(_Config) -> + {ok, C} = emqtt:start_link(), + on_exit(fun() -> emqtt:stop(C) end), + {ok, _} = emqtt:connect(C), + + Cases = + [ + #{ + select_fields => + <<"payload.measurement, payload.data_type, payload.value, payload.device_id">>, + payload => emqx_utils_json:encode(#{ + measurement => <<"temp">>, + data_type => <<"FLOAT">>, + value => <<"32.12">>, + device_id => <<"devid">> + }), + expected => #{ + payload => #{ + <<"measurement">> => <<"temp">>, + <<"data_type">> => <<"FLOAT">>, + <<"value">> => <<"32.12">>, + <<"device_id">> => <<"devid">> + } + } + }, + %% "last write wins" examples + #{ + select_fields => <<"payload as p, payload.f as p.answer">>, + payload => emqx_utils_json:encode(#{f => 42, keep => <<"that?">>}), + expected => #{ + <<"p">> => #{ + <<"answer">> => 42 + } + } + }, + #{ + select_fields => <<"payload as p, payload.f as p.jsonlike.f">>, + payload => emqx_utils_json:encode(#{ + jsonlike => emqx_utils_json:encode(#{a => 0}), + f => <<"huh">> + }), + %% behavior from 4.4: jsonlike gets wiped without preserving old "keys" + %% here we overwrite it since we don't explicitly decode it + expected => #{ + <<"p">> => #{ + <<"jsonlike">> => #{<<"f">> => <<"huh">>} + } + } + }, + #{ + select_fields => + <<"payload as p, 42 as p, payload.measurement as p.measurement, 51 as p">>, + payload => emqx_utils_json:encode(#{ + measurement => <<"temp">>, + data_type => <<"FLOAT">>, + value => <<"32.12">>, + device_id => <<"devid">> + }), + expected => #{ + <<"p">> => 51 + } + }, + %% if selected field is already structured, new values are inserted into it + #{ + select_fields => + <<"json_decode(payload) as p, payload.a as p.z">>, + payload => emqx_utils_json:encode(#{ + a => 1, + b => <<"2">> + }), + expected => #{ + <<"p">> => #{ + <<"a">> => 1, + <<"b">> => <<"2">>, + <<"z">> => 1 + } + } + } + ], + ActionFn = <<(atom_to_binary(?MODULE))/binary, ":action_response">>, + Topic = <<"some/topic">>, + + ok = snabbkaffe:start_trace(), + on_exit(fun() -> snabbkaffe:stop() end), + on_exit(fun() -> delete_rule(?TMP_RULEID) end), + lists:foreach( + fun(#{select_fields := Fs, payload := P, expected := E} = Case) -> + ct:pal("testing case ~p", [Case]), + SQL = <<"select ", Fs/binary, " from \"", Topic/binary, "\"">>, + delete_rule(?TMP_RULEID), + {ok, _Rule} = emqx_rule_engine:create_rule( + #{ + sql => SQL, + id => ?TMP_RULEID, + actions => [#{function => ActionFn}] + } + ), + {_, {ok, Event}} = + ?wait_async_action( + emqtt:publish(C, Topic, P, 0), + #{?snk_kind := action_response}, + 5_000 + ), + ?assertMatch( + #{selected := E}, + Event, + #{payload => P, fields => Fs, expected => E} + ), + ok + end, + Cases + ), + snabbkaffe:stop(), + + ok. + t_events(_Config) -> {ok, Client} = emqtt:start_link( [ @@ -3065,6 +3182,14 @@ republish_action(Topic, Payload, UserProperties) -> } }. +action_response(Selected, Envs, Args) -> + ?tp(action_response, #{ + selected => Selected, + envs => Envs, + args => Args + }), + ok. + make_simple_rule_with_ts(RuleId, Ts) when is_binary(RuleId) -> SQL = <<"select * from \"simple/topic\"">>, make_simple_rule(RuleId, SQL, Ts). diff --git a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl index 9fdd60c56..f206e7fb1 100644 --- a/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl +++ b/apps/emqx_rule_engine/test/emqx_rule_maps_SUITE.erl @@ -71,7 +71,49 @@ t_nested_put_map(_) -> ?assertEqual( #{k => #{<<"t">> => #{<<"a">> => v1}}}, nested_put(?path([k, t, <<"a">>]), v1, #{k => #{<<"t">> => v0}}) - ). + ), + %% note: since we handle json-encoded binaries when evaluating the + %% rule rather than baking the decoding in `nested_put`, we test + %% this corner case that _would_ otherwise lose data to + %% demonstrate this behavior. + ?assertEqual( + #{payload => #{<<"a">> => v1}}, + nested_put( + ?path([payload, <<"a">>]), + v1, + #{payload => emqx_utils_json:encode(#{b => <<"v2">>})} + ) + ), + %% We have an asymmetry in the behavior here because `nested_put' + %% currently, at each key, will use `general_find' to get the + %% current value of the eky, and that attempts JSON decoding the + %% such value... So, the cases below, `old' gets preserved + %% because it's in this direct path. + ?assertEqual( + #{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"v2">>}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => emqx_utils_json:encode(#{a => #{old => <<"v2">>}})} + ) + ), + ?assertEqual( + #{payload => #{<<"a">> => #{<<"new">> => v1, <<"old">> => <<"{}">>}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => emqx_utils_json:encode(#{a => #{old => <<"{}">>}, b => <<"{}">>})} + ) + ), + ?assertEqual( + #{payload => #{<<"a">> => #{<<"new">> => v1}}}, + nested_put( + ?path([payload, <<"a">>, <<"new">>]), + v1, + #{payload => <<"{}">>} + ) + ), + ok. t_nested_put_index(_) -> ?assertEqual([1, a, 3], nested_put(?path([{ic, 2}]), a, [1, 2, 3])), diff --git a/changes/ce/fix-11271.en.md b/changes/ce/fix-11271.en.md new file mode 100644 index 000000000..af95d3863 --- /dev/null +++ b/changes/ce/fix-11271.en.md @@ -0,0 +1 @@ +Ensure that the range of percentage type is from 0% to 100%. diff --git a/changes/ce/fix-11272.en.md b/changes/ce/fix-11272.en.md new file mode 100644 index 000000000..fbcb0d8b8 --- /dev/null +++ b/changes/ce/fix-11272.en.md @@ -0,0 +1 @@ +Fix a typo in the log, when EMQX received an abnormal `PUBREL` packet, the `pubrel` was mistakenly typo as `pubrec`. diff --git a/changes/ce/fix-11281.en.md b/changes/ce/fix-11281.en.md new file mode 100644 index 000000000..a73159343 --- /dev/null +++ b/changes/ce/fix-11281.en.md @@ -0,0 +1 @@ +Restored support for the special `$queue/` shared subscription. diff --git a/changes/ee/fix-11266.en.md b/changes/ee/fix-11266.en.md new file mode 100644 index 000000000..161982a5b --- /dev/null +++ b/changes/ee/fix-11266.en.md @@ -0,0 +1,20 @@ +Fix and improve support for TDEngine `insert` syntax. + +1. Support inserting into multi-table in the template + + For example: + + `insert into table_1 values (${ts}, '${id}', '${topic}') + table_2 values (${ts}, '${id}', '${topic}')` + +2. Support mixing prefixes/suffixes and placeholders in the template + + For example: + + `insert into table_${topic} values (${ts}, '${id}', '${topic}')` + +Note: This is a breaking change. Previously the values of string type were quoted automatically, but now they must be quoted explicitly. + +For example: + +`insert into table values (${ts}, '${a_string}')` diff --git a/rel/i18n/emqx_bridge_hstreamdb.hocon b/rel/i18n/emqx_bridge_hstreamdb.hocon index 10700d4eb..809c60588 100644 --- a/rel/i18n/emqx_bridge_hstreamdb.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb.hocon @@ -41,7 +41,8 @@ local_topic.label: """Local Topic""" record_template.desc: -"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.""" +"""The HStream Record template to be forwarded to the HStreamDB. Placeholders supported.
+NOTE: When you use `raw record` template (which means the data is not a valid JSON), you should use `read` or `subscription` in HStream to get the data.""" record_template.label: """HStream Record""" diff --git a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon index c0faa794c..8f7ac2edb 100644 --- a/rel/i18n/emqx_bridge_hstreamdb_connector.hocon +++ b/rel/i18n/emqx_bridge_hstreamdb_connector.hocon @@ -19,31 +19,31 @@ name.label: """Connector Name""" url.desc: -"""HStreamDB Server URL""" +"""HStreamDB Server URL. Using gRPC http server address.""" url.label: """HStreamDB Server URL""" stream_name.desc: -"""HStreamDB Stream Name""" +"""HStreamDB Stream Name.""" stream_name.label: """HStreamDB Stream Name""" partition_key.desc: -"""HStreamDB Ordering Key""" +"""HStreamDB Partition Key. Placeholders supported.""" partition_key.label: -"""HStreamDB Ordering Key""" +"""HStreamDB Partition Key""" pool_size.desc: -"""HStreamDB Pool Size""" +"""HStreamDB Pool Size.""" pool_size.label: """HStreamDB Pool Size""" grpc_timeout.desc: -"""HStreamDB gRPC Timeout""" +"""HStreamDB gRPC Timeout.""" grpc_timeout.label: """HStreamDB gRPC Timeout"""