Merge pull request #11360 from id/0727-sync-release-51-to-master
This commit is contained in:
commit
5ac01c9b85
2
Makefile
2
Makefile
|
@ -16,7 +16,7 @@ endif
|
||||||
# Dashboard version
|
# Dashboard version
|
||||||
# from https://github.com/emqx/emqx-dashboard5
|
# from https://github.com/emqx/emqx-dashboard5
|
||||||
export EMQX_DASHBOARD_VERSION ?= v1.3.2
|
export EMQX_DASHBOARD_VERSION ?= v1.3.2
|
||||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1-beta.4
|
export EMQX_EE_DASHBOARD_VERSION ?= e1.1.1
|
||||||
|
|
||||||
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
|
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
|
||||||
# In make 4.4+, for backward-compatibility the value from the original environment is used.
|
# In make 4.4+, for backward-compatibility the value from the original environment is used.
|
||||||
|
|
|
@ -60,6 +60,28 @@
|
||||||
end)()
|
end)()
|
||||||
).
|
).
|
||||||
|
|
||||||
|
-define(assertNotReceive(PATTERN),
|
||||||
|
?assertNotReceive(PATTERN, 300)
|
||||||
|
).
|
||||||
|
|
||||||
|
-define(assertNotReceive(PATTERN, TIMEOUT),
|
||||||
|
(fun() ->
|
||||||
|
receive
|
||||||
|
X__V = PATTERN ->
|
||||||
|
erlang:error(
|
||||||
|
{assertNotReceive, [
|
||||||
|
{module, ?MODULE},
|
||||||
|
{line, ?LINE},
|
||||||
|
{expression, (??PATTERN)},
|
||||||
|
{message, X__V}
|
||||||
|
]}
|
||||||
|
)
|
||||||
|
after TIMEOUT ->
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
end)()
|
||||||
|
).
|
||||||
|
|
||||||
-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
|
-define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
|
||||||
__TEST_CASE = ?FUNCTION_NAME,
|
__TEST_CASE = ?FUNCTION_NAME,
|
||||||
(fun
|
(fun
|
||||||
|
|
|
@ -35,7 +35,7 @@
|
||||||
-define(EMQX_RELEASE_CE, "5.1.2").
|
-define(EMQX_RELEASE_CE, "5.1.2").
|
||||||
|
|
||||||
%% Enterprise edition
|
%% Enterprise edition
|
||||||
-define(EMQX_RELEASE_EE, "5.1.1-alpha.2").
|
-define(EMQX_RELEASE_EE, "5.1.1").
|
||||||
|
|
||||||
%% The HTTP API version
|
%% The HTTP API version
|
||||||
-define(EMQX_API_VERSION, "5.0").
|
-define(EMQX_API_VERSION, "5.0").
|
||||||
|
|
|
@ -488,7 +488,7 @@ handle_in(
|
||||||
ok ->
|
ok ->
|
||||||
TopicFilters0 = parse_topic_filters(TopicFilters),
|
TopicFilters0 = parse_topic_filters(TopicFilters),
|
||||||
TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
|
TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
|
||||||
TupleTopicFilters0 = check_sub_authzs(SubPkt, TopicFilters1, Channel),
|
TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
|
||||||
HasAuthzDeny = lists:any(
|
HasAuthzDeny = lists:any(
|
||||||
fun({_TopicFilter, ReasonCode}) ->
|
fun({_TopicFilter, ReasonCode}) ->
|
||||||
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
ReasonCode =:= ?RC_NOT_AUTHORIZED
|
||||||
|
@ -1804,9 +1804,7 @@ authz_action(#mqtt_packet{
|
||||||
header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{}
|
header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{}
|
||||||
}) ->
|
}) ->
|
||||||
?AUTHZ_PUBLISH(QoS, Retain);
|
?AUTHZ_PUBLISH(QoS, Retain);
|
||||||
authz_action(#mqtt_packet{
|
authz_action({_Topic, #{qos := QoS} = _SubOpts} = _TopicFilter) ->
|
||||||
header = #mqtt_packet_header{qos = QoS}, variable = #mqtt_packet_subscribe{}
|
|
||||||
}) ->
|
|
||||||
?AUTHZ_SUBSCRIBE(QoS);
|
?AUTHZ_SUBSCRIBE(QoS);
|
||||||
%% Will message
|
%% Will message
|
||||||
authz_action(#message{qos = QoS, flags = #{retain := Retain}}) ->
|
authz_action(#message{qos = QoS, flags = #{retain := Retain}}) ->
|
||||||
|
@ -1847,23 +1845,22 @@ check_pub_caps(
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Check Sub Authorization
|
%% Check Sub Authorization
|
||||||
|
|
||||||
check_sub_authzs(Packet, TopicFilters, Channel) ->
|
check_sub_authzs(TopicFilters, Channel) ->
|
||||||
Action = authz_action(Packet),
|
check_sub_authzs(TopicFilters, Channel, []).
|
||||||
check_sub_authzs(Action, TopicFilters, Channel, []).
|
|
||||||
|
|
||||||
check_sub_authzs(
|
check_sub_authzs(
|
||||||
Action,
|
|
||||||
[TopicFilter = {Topic, _} | More],
|
[TopicFilter = {Topic, _} | More],
|
||||||
Channel = #channel{clientinfo = ClientInfo},
|
Channel = #channel{clientinfo = ClientInfo},
|
||||||
Acc
|
Acc
|
||||||
) ->
|
) ->
|
||||||
|
Action = authz_action(TopicFilter),
|
||||||
case emqx_access_control:authorize(ClientInfo, Action, Topic) of
|
case emqx_access_control:authorize(ClientInfo, Action, Topic) of
|
||||||
allow ->
|
allow ->
|
||||||
check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
|
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
|
||||||
deny ->
|
deny ->
|
||||||
check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
|
||||||
end;
|
end;
|
||||||
check_sub_authzs(_Action, [], _Channel, Acc) ->
|
check_sub_authzs([], _Channel, Acc) ->
|
||||||
lists:reverse(Acc).
|
lists:reverse(Acc).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -908,8 +908,7 @@ t_check_pub_alias(_) ->
|
||||||
t_check_sub_authzs(_) ->
|
t_check_sub_authzs(_) ->
|
||||||
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
emqx_config:put_zone_conf(default, [authorization, enable], true),
|
||||||
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
|
TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
|
||||||
Subscribe = ?SUBSCRIBE_PACKET(1, [TopicFilter]),
|
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
|
||||||
[{TopicFilter, 0}] = emqx_channel:check_sub_authzs(Subscribe, [TopicFilter], channel()).
|
|
||||||
|
|
||||||
t_enrich_connack_caps(_) ->
|
t_enrich_connack_caps(_) ->
|
||||||
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%
|
||||||
|
%% Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
%% you may not use this file except in compliance with the License.
|
||||||
|
%% You may obtain a copy of the License at
|
||||||
|
%% http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
%%
|
||||||
|
%% Unless required by applicable law or agreed to in writing, software
|
||||||
|
%% distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
%% See the License for the specific language governing permissions and
|
||||||
|
%% limitations under the License.
|
||||||
|
%%
|
||||||
|
%% @doc Test suite verifies that MQTT retain and qos parameters
|
||||||
|
%% correctly reach the authorization.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
|
-module(emqx_authz_rich_actions_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("emqx/include/asserts.hrl").
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
[].
|
||||||
|
|
||||||
|
init_per_testcase(TestCase, Config) ->
|
||||||
|
Apps = emqx_cth_suite:start(
|
||||||
|
[
|
||||||
|
{emqx_conf, "authorization.no_match = deny, authorization.cache.enable = false"},
|
||||||
|
emqx_authz
|
||||||
|
],
|
||||||
|
#{work_dir => filename:join(?config(priv_dir, Config), TestCase)}
|
||||||
|
),
|
||||||
|
[{tc_apps, Apps} | Config].
|
||||||
|
|
||||||
|
end_per_testcase(_TestCase, Config) ->
|
||||||
|
emqx_cth_suite:stop(?config(tc_apps, Config)),
|
||||||
|
_ = emqx_authz:set_feature_available(rich_actions, true).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_rich_actions_subscribe(_Config) ->
|
||||||
|
ok = setup_config(#{
|
||||||
|
<<"type">> => <<"file">>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"rules">> =>
|
||||||
|
<<
|
||||||
|
"{allow, {user, \"username\"}, {subscribe, [{qos, 1}]}, [\"t1\"]}."
|
||||||
|
"\n{allow, {user, \"username\"}, {subscribe, [{qos, 2}]}, [\"t2\"]}."
|
||||||
|
>>
|
||||||
|
}),
|
||||||
|
|
||||||
|
{ok, C} = emqtt:start_link([{username, <<"username">>}]),
|
||||||
|
{ok, _} = emqtt:connect(C),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _, [1]},
|
||||||
|
emqtt:subscribe(C, <<"t1">>, 1)
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _, [1, 2]},
|
||||||
|
emqtt:subscribe(C, #{}, [{<<"t1">>, [{qos, 1}]}, {<<"t2">>, [{qos, 2}]}])
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _, [128, 128]},
|
||||||
|
emqtt:subscribe(C, #{}, [{<<"t1">>, [{qos, 2}]}, {<<"t2">>, [{qos, 1}]}])
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(C).
|
||||||
|
|
||||||
|
t_rich_actions_publish(_Config) ->
|
||||||
|
ok = setup_config(#{
|
||||||
|
<<"type">> => <<"file">>,
|
||||||
|
<<"enable">> => true,
|
||||||
|
<<"rules">> =>
|
||||||
|
<<
|
||||||
|
"{allow, {user, \"publisher\"}, {publish, [{qos, 0}]}, [\"t0\"]}."
|
||||||
|
"\n{allow, {user, \"publisher\"}, {publish, [{qos, 1}, {retain, true}]}, [\"t1\"]}."
|
||||||
|
"\n{allow, {user, \"subscriber\"}, subscribe, [\"#\"]}."
|
||||||
|
>>
|
||||||
|
}),
|
||||||
|
|
||||||
|
{ok, PC} = emqtt:start_link([{username, <<"publisher">>}]),
|
||||||
|
{ok, _} = emqtt:connect(PC),
|
||||||
|
|
||||||
|
{ok, SC} = emqtt:start_link([{username, <<"subscriber">>}]),
|
||||||
|
{ok, _} = emqtt:connect(SC),
|
||||||
|
{ok, _, _} = emqtt:subscribe(SC, <<"#">>, 1),
|
||||||
|
|
||||||
|
_ = emqtt:publish(PC, <<"t0">>, <<"qos0">>, [{qos, 0}]),
|
||||||
|
_ = emqtt:publish(PC, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]),
|
||||||
|
|
||||||
|
_ = emqtt:publish(PC, <<"t0">>, <<"qos1">>, [{qos, 1}]),
|
||||||
|
_ = emqtt:publish(PC, <<"t1">>, <<"qos1-noretain">>, [{qos, 1}, {retain, false}]),
|
||||||
|
|
||||||
|
?assertReceive(
|
||||||
|
{publish, #{topic := <<"t0">>, payload := <<"qos0">>}}
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertReceive(
|
||||||
|
{publish, #{topic := <<"t1">>, payload := <<"qos1-retain">>}}
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertNotReceive(
|
||||||
|
{publish, #{topic := <<"t0">>, payload := <<"qos1">>}}
|
||||||
|
),
|
||||||
|
|
||||||
|
?assertNotReceive(
|
||||||
|
{publish, #{topic := <<"t1">>, payload := <<"qos1-noretain">>}}
|
||||||
|
),
|
||||||
|
|
||||||
|
ok = emqtt:stop(PC),
|
||||||
|
ok = emqtt:stop(SC).
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helpers
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
setup_config(Params) ->
|
||||||
|
emqx_authz_test_lib:setup_config(
|
||||||
|
Params,
|
||||||
|
#{}
|
||||||
|
).
|
||||||
|
|
||||||
|
stop_apps(Apps) ->
|
||||||
|
lists:foreach(fun application:stop/1, Apps).
|
|
@ -32,7 +32,8 @@ api_schemas(Method) ->
|
||||||
api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
|
api_ref(emqx_bridge_mongodb, <<"mongodb_rs">>, Method ++ "_rs"),
|
||||||
api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
|
api_ref(emqx_bridge_mongodb, <<"mongodb_sharded">>, Method ++ "_sharded"),
|
||||||
api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"),
|
api_ref(emqx_bridge_mongodb, <<"mongodb_single">>, Method ++ "_single"),
|
||||||
api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method),
|
%% TODO: un-hide for e5.2.0...
|
||||||
|
%%api_ref(emqx_bridge_hstreamdb, <<"hstreamdb">>, Method),
|
||||||
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"),
|
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v1">>, Method ++ "_api_v1"),
|
||||||
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"),
|
api_ref(emqx_bridge_influxdb, <<"influxdb_api_v2">>, Method ++ "_api_v2"),
|
||||||
api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"),
|
api_ref(emqx_bridge_redis, <<"redis_single">>, Method ++ "_single"),
|
||||||
|
@ -146,7 +147,8 @@ fields(bridges) ->
|
||||||
hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")),
|
hoconsc:map(name, ref(emqx_bridge_hstreamdb, "config")),
|
||||||
#{
|
#{
|
||||||
desc => <<"HStreamDB Bridge Config">>,
|
desc => <<"HStreamDB Bridge Config">>,
|
||||||
required => false
|
required => false,
|
||||||
|
importance => ?IMPORTANCE_HIDDEN
|
||||||
}
|
}
|
||||||
)},
|
)},
|
||||||
{mysql,
|
{mysql,
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_gcp_pubsub, [
|
{application, emqx_bridge_gcp_pubsub, [
|
||||||
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
|
{description, "EMQX Enterprise GCP Pub/Sub Bridge"},
|
||||||
{vsn, "0.1.4"},
|
{vsn, "0.1.5"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -40,13 +40,13 @@ groups() ->
|
||||||
|
|
||||||
init_per_suite(_Config) ->
|
init_per_suite(_Config) ->
|
||||||
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
emqx_common_test_helpers:render_and_load_app_config(emqx_conf),
|
||||||
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge]),
|
ok = emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_bridge, emqx_rule_engine]),
|
||||||
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
|
||||||
{ok, _} = application:ensure_all_started(emqx_connector),
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
[].
|
[].
|
||||||
|
|
||||||
end_per_suite(_Config) ->
|
end_per_suite(_Config) ->
|
||||||
ok = emqx_mgmt_api_test_util:end_suite([emqx_conf, emqx_bridge]),
|
ok = emqx_mgmt_api_test_util:end_suite([emqx_rule_engine, emqx_bridge, emqx_conf]),
|
||||||
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
|
||||||
_ = application:stop(emqx_connector),
|
_ = application:stop(emqx_connector),
|
||||||
_ = application:stop(emqx_bridge),
|
_ = application:stop(emqx_bridge),
|
||||||
|
@ -77,13 +77,19 @@ init_per_testcase(t_too_many_requests, Config) ->
|
||||||
),
|
),
|
||||||
ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
|
ok = emqx_bridge_http_connector_test_server:set_handler(too_many_requests_http_handler()),
|
||||||
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
[{http_server, #{port => HTTPPort, path => HTTPPath}} | Config];
|
||||||
|
init_per_testcase(t_rule_action_expired, Config) ->
|
||||||
|
[
|
||||||
|
{bridge_name, ?BRIDGE_NAME}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
init_per_testcase(_TestCase, Config) ->
|
init_per_testcase(_TestCase, Config) ->
|
||||||
Server = start_http_server(#{response_delay_ms => 0}),
|
Server = start_http_server(#{response_delay_ms => 0}),
|
||||||
[{http_server, Server} | Config].
|
[{http_server, Server} | Config].
|
||||||
|
|
||||||
end_per_testcase(TestCase, _Config) when
|
end_per_testcase(TestCase, _Config) when
|
||||||
TestCase =:= t_path_not_found;
|
TestCase =:= t_path_not_found;
|
||||||
TestCase =:= t_too_many_requests
|
TestCase =:= t_too_many_requests;
|
||||||
|
TestCase =:= t_rule_action_expired
|
||||||
->
|
->
|
||||||
ok = emqx_bridge_http_connector_test_server:stop(),
|
ok = emqx_bridge_http_connector_test_server:stop(),
|
||||||
persistent_term:erase({?MODULE, times_called}),
|
persistent_term:erase({?MODULE, times_called}),
|
||||||
|
@ -202,6 +208,7 @@ parse_http_request_assertive(ReqStr0) ->
|
||||||
bridge_async_config(#{port := Port} = Config) ->
|
bridge_async_config(#{port := Port} = Config) ->
|
||||||
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
Type = maps:get(type, Config, ?BRIDGE_TYPE),
|
||||||
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
Name = maps:get(name, Config, ?BRIDGE_NAME),
|
||||||
|
Host = maps:get(host, Config, "localhost"),
|
||||||
Path = maps:get(path, Config, ""),
|
Path = maps:get(path, Config, ""),
|
||||||
PoolSize = maps:get(pool_size, Config, 1),
|
PoolSize = maps:get(pool_size, Config, 1),
|
||||||
QueryMode = maps:get(query_mode, Config, "async"),
|
QueryMode = maps:get(query_mode, Config, "async"),
|
||||||
|
@ -219,7 +226,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
end,
|
end,
|
||||||
ConfigString = io_lib:format(
|
ConfigString = io_lib:format(
|
||||||
"bridges.~s.~s {\n"
|
"bridges.~s.~s {\n"
|
||||||
" url = \"http://localhost:~p~s\"\n"
|
" url = \"http://~s:~p~s\"\n"
|
||||||
" connect_timeout = \"~p\"\n"
|
" connect_timeout = \"~p\"\n"
|
||||||
" enable = true\n"
|
" enable = true\n"
|
||||||
%% local_topic
|
%% local_topic
|
||||||
|
@ -249,6 +256,7 @@ bridge_async_config(#{port := Port} = Config) ->
|
||||||
[
|
[
|
||||||
Type,
|
Type,
|
||||||
Name,
|
Name,
|
||||||
|
Host,
|
||||||
Port,
|
Port,
|
||||||
Path,
|
Path,
|
||||||
ConnectTimeout,
|
ConnectTimeout,
|
||||||
|
@ -550,6 +558,61 @@ t_too_many_requests(Config) ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_rule_action_expired(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
RuleTopic = <<"t/webhook/rule">>,
|
||||||
|
BridgeConfig = bridge_async_config(#{
|
||||||
|
type => ?BRIDGE_TYPE,
|
||||||
|
name => ?BRIDGE_NAME,
|
||||||
|
host => "non.existent.host",
|
||||||
|
port => 9999,
|
||||||
|
path => <<"/some/path">>,
|
||||||
|
resume_interval => "100ms",
|
||||||
|
connect_timeout => "1s",
|
||||||
|
request_timeout => "100ms",
|
||||||
|
resource_request_ttl => "100ms"
|
||||||
|
}),
|
||||||
|
{ok, _} = emqx_bridge:create(?BRIDGE_TYPE, ?BRIDGE_NAME, BridgeConfig),
|
||||||
|
{ok, #{<<"id">> := RuleId}} =
|
||||||
|
emqx_bridge_testlib:create_rule_and_action_http(?BRIDGE_TYPE, RuleTopic, Config),
|
||||||
|
Msg = emqx_message:make(RuleTopic, <<"timeout">>),
|
||||||
|
emqx:publish(Msg),
|
||||||
|
?retry(
|
||||||
|
_Interval = 500,
|
||||||
|
_NAttempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
matched := 1,
|
||||||
|
failed := 0,
|
||||||
|
dropped := 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_bridge:get_metrics(?BRIDGE_TYPE, ?BRIDGE_NAME)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?retry(
|
||||||
|
_Interval = 500,
|
||||||
|
_NAttempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
counters := #{
|
||||||
|
matched := 1,
|
||||||
|
'actions.failed' := 1,
|
||||||
|
'actions.failed.unknown' := 1,
|
||||||
|
'actions.total' := 1
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_metrics_worker:get_metrics(rule_metrics, RuleId)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%% helpers
|
%% helpers
|
||||||
do_t_async_retries(TestContext, Error, Fn) ->
|
do_t_async_retries(TestContext, Error, Fn) ->
|
||||||
#{error_attempts := ErrorAttempts} = TestContext,
|
#{error_attempts := ErrorAttempts} = TestContext,
|
||||||
|
|
|
@ -165,6 +165,9 @@ sql_insert_template_for_bridge() ->
|
||||||
sql_insert_template_with_nested_token_for_bridge() ->
|
sql_insert_template_with_nested_token_for_bridge() ->
|
||||||
"INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload.msg}, ${retain})".
|
"INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload.msg}, ${retain})".
|
||||||
|
|
||||||
|
sql_insert_template_with_inconsistent_datatype() ->
|
||||||
|
"INSERT INTO mqtt_test(topic, msgid, payload, retain) VALUES (${topic}, ${id}, ${payload}, ${flags})".
|
||||||
|
|
||||||
sql_create_table() ->
|
sql_create_table() ->
|
||||||
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
|
"CREATE TABLE mqtt_test (topic VARCHAR2(255), msgid VARCHAR2(64), payload NCLOB, retain NUMBER(1))".
|
||||||
|
|
||||||
|
@ -333,10 +336,11 @@ update_bridge_api(Config, Overrides) ->
|
||||||
probe_bridge_api(Config) ->
|
probe_bridge_api(Config) ->
|
||||||
probe_bridge_api(Config, _Overrides = #{}).
|
probe_bridge_api(Config, _Overrides = #{}).
|
||||||
|
|
||||||
probe_bridge_api(Config, _Overrides) ->
|
probe_bridge_api(Config, Overrides) ->
|
||||||
TypeBin = ?BRIDGE_TYPE_BIN,
|
TypeBin = ?BRIDGE_TYPE_BIN,
|
||||||
Name = ?config(oracle_name, Config),
|
Name = ?config(oracle_name, Config),
|
||||||
OracleConfig = ?config(oracle_config, Config),
|
OracleConfig0 = ?config(oracle_config, Config),
|
||||||
|
OracleConfig = emqx_utils_maps:deep_merge(OracleConfig0, Overrides),
|
||||||
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
|
Params = OracleConfig#{<<"type">> => TypeBin, <<"name">> => Name},
|
||||||
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
Path = emqx_mgmt_api_test_util:api_path(["bridges_probe"]),
|
||||||
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
AuthHeader = emqx_mgmt_api_test_util:auth_header_(),
|
||||||
|
@ -482,6 +486,8 @@ t_create_via_http(Config) ->
|
||||||
end,
|
end,
|
||||||
[]
|
[]
|
||||||
),
|
),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?assertMatch(1, length(ecpool:workers(ResourceId))),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_start_stop(Config) ->
|
t_start_stop(Config) ->
|
||||||
|
@ -537,6 +543,14 @@ t_start_stop(Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_probe_with_nested_tokens(Config) ->
|
t_probe_with_nested_tokens(Config) ->
|
||||||
|
ProbeRes0 = probe_bridge_api(
|
||||||
|
Config,
|
||||||
|
#{<<"sql">> => sql_insert_template_with_nested_token_for_bridge()}
|
||||||
|
),
|
||||||
|
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0).
|
||||||
|
|
||||||
|
t_message_with_nested_tokens(Config) ->
|
||||||
|
BridgeId = bridge_id(Config),
|
||||||
ResourceId = resource_id(Config),
|
ResourceId = resource_id(Config),
|
||||||
reset_table(Config),
|
reset_table(Config),
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
|
@ -551,7 +565,34 @@ t_probe_with_nested_tokens(Config) ->
|
||||||
_Sleep = 1_000,
|
_Sleep = 1_000,
|
||||||
_Attempts = 20,
|
_Attempts = 20,
|
||||||
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
).
|
),
|
||||||
|
MsgId = erlang:unique_integer(),
|
||||||
|
Data = binary_to_list(?config(oracle_name, Config)),
|
||||||
|
Params = #{
|
||||||
|
topic => ?config(mqtt_topic, Config),
|
||||||
|
id => MsgId,
|
||||||
|
payload => emqx_utils_json:encode(#{<<"msg">> => Data}),
|
||||||
|
retain => false
|
||||||
|
},
|
||||||
|
emqx_bridge:send_message(BridgeId, Params),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertMatch(
|
||||||
|
{ok, [{result_set, [<<"PAYLOAD">>], _, [[Data]]}]},
|
||||||
|
emqx_resource:simple_sync_query(
|
||||||
|
ResourceId, {query, "SELECT payload FROM mqtt_test"}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_probe_with_inconsistent_datatype(Config) ->
|
||||||
|
ProbeRes0 = probe_bridge_api(
|
||||||
|
Config,
|
||||||
|
#{<<"sql">> => sql_insert_template_with_inconsistent_datatype()}
|
||||||
|
),
|
||||||
|
?assertMatch({ok, {{_, 204, _}, _Headers, _Body}}, ProbeRes0).
|
||||||
|
|
||||||
t_on_get_status(Config) ->
|
t_on_get_status(Config) ->
|
||||||
ProxyPort = ?config(proxy_port, Config),
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
|
|
@ -700,3 +700,31 @@ t_table_removed(Config) ->
|
||||||
),
|
),
|
||||||
connect_and_create_table(Config),
|
connect_and_create_table(Config),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_concurrent_health_checks(Config) ->
|
||||||
|
Name = ?config(pgsql_name, Config),
|
||||||
|
BridgeType = ?config(pgsql_bridge_type, Config),
|
||||||
|
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
connect_and_create_table(Config),
|
||||||
|
?assertMatch({ok, _}, create_bridge(Config)),
|
||||||
|
?retry(
|
||||||
|
_Sleep = 1_000,
|
||||||
|
_Attempts = 20,
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
|
||||||
|
),
|
||||||
|
emqx_utils:pmap(
|
||||||
|
fun(_) ->
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceID))
|
||||||
|
end,
|
||||||
|
lists:seq(1, 20)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertEqual([], ?of_kind(postgres_connector_bad_parse2, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_connector, [
|
{application, emqx_connector, [
|
||||||
{description, "EMQX Data Integration Connectors"},
|
{description, "EMQX Data Integration Connectors"},
|
||||||
{vsn, "0.1.27"},
|
{vsn, "0.1.28"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_connector_app, []}},
|
{mod, {emqx_connector_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -62,6 +62,11 @@
|
||||||
prepare_statement := epgsql:statement()
|
prepare_statement := epgsql:statement()
|
||||||
}.
|
}.
|
||||||
|
|
||||||
|
%% FIXME: add `{error, sync_required}' to `epgsql:execute_batch'
|
||||||
|
%% We want to be able to call sync if any message from the backend leaves the driver in an
|
||||||
|
%% inconsistent state needing sync.
|
||||||
|
-dialyzer({nowarn_function, [execute_batch/3]}).
|
||||||
|
|
||||||
%%=====================================================================
|
%%=====================================================================
|
||||||
|
|
||||||
roots() ->
|
roots() ->
|
||||||
|
@ -252,6 +257,8 @@ on_sql_query(InstId, PoolName, Type, NameOrSQL, Data) ->
|
||||||
reason => Reason
|
reason => Reason
|
||||||
}),
|
}),
|
||||||
case Reason of
|
case Reason of
|
||||||
|
sync_required ->
|
||||||
|
{error, {recoverable_error, Reason}};
|
||||||
ecpool_empty ->
|
ecpool_empty ->
|
||||||
{error, {recoverable_error, Reason}};
|
{error, {recoverable_error, Reason}};
|
||||||
{error, error, _, undefined_table, _, _} ->
|
{error, error, _, undefined_table, _, _} ->
|
||||||
|
@ -307,28 +314,13 @@ do_check_prepares(
|
||||||
prepare_sql := #{<<"send_message">> := SQL}
|
prepare_sql := #{<<"send_message">> := SQL}
|
||||||
} = State
|
} = State
|
||||||
) ->
|
) ->
|
||||||
% it's already connected. Verify if target table still exists
|
WorkerPids = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
||||||
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
|
case validate_table_existence(WorkerPids, SQL) of
|
||||||
lists:foldl(
|
ok ->
|
||||||
fun
|
ok;
|
||||||
(WorkerPid, ok) ->
|
{error, undefined_table} ->
|
||||||
case ecpool_worker:client(WorkerPid) of
|
{error, {undefined_table, State}}
|
||||||
{ok, Conn} ->
|
|
||||||
case epgsql:parse2(Conn, "get_status", SQL, []) of
|
|
||||||
{error, {_, _, _, undefined_table, _, _}} ->
|
|
||||||
{error, {undefined_table, State}};
|
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end;
|
end;
|
||||||
_ ->
|
|
||||||
ok
|
|
||||||
end;
|
|
||||||
(_, Acc) ->
|
|
||||||
Acc
|
|
||||||
end,
|
|
||||||
ok,
|
|
||||||
Workers
|
|
||||||
);
|
|
||||||
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
|
do_check_prepares(#{prepare_sql := Prepares}) when is_map(Prepares) ->
|
||||||
ok;
|
ok;
|
||||||
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
|
do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepares}}) ->
|
||||||
|
@ -344,6 +336,30 @@ do_check_prepares(State = #{pool_name := PoolName, prepare_sql := {error, Prepar
|
||||||
{error, Error}
|
{error, Error}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec validate_table_existence([pid()], binary()) -> ok | {error, undefined_table}.
|
||||||
|
validate_table_existence([WorkerPid | Rest], SQL) ->
|
||||||
|
try ecpool_worker:client(WorkerPid) of
|
||||||
|
{ok, Conn} ->
|
||||||
|
case epgsql:parse2(Conn, "", SQL, []) of
|
||||||
|
{error, {_, _, _, undefined_table, _, _}} ->
|
||||||
|
{error, undefined_table};
|
||||||
|
Res when is_tuple(Res) andalso ok == element(1, Res) ->
|
||||||
|
ok;
|
||||||
|
Res ->
|
||||||
|
?tp(postgres_connector_bad_parse2, #{result => Res}),
|
||||||
|
validate_table_existence(Rest, SQL)
|
||||||
|
end;
|
||||||
|
_ ->
|
||||||
|
validate_table_existence(Rest, SQL)
|
||||||
|
catch
|
||||||
|
exit:{noproc, _} ->
|
||||||
|
validate_table_existence(Rest, SQL)
|
||||||
|
end;
|
||||||
|
validate_table_existence([], _SQL) ->
|
||||||
|
%% All workers either replied an unexpected error; we will retry
|
||||||
|
%% on the next health check.
|
||||||
|
ok.
|
||||||
|
|
||||||
%% ===================================================================
|
%% ===================================================================
|
||||||
|
|
||||||
connect(Opts) ->
|
connect(Opts) ->
|
||||||
|
@ -358,13 +374,31 @@ connect(Opts) ->
|
||||||
end.
|
end.
|
||||||
|
|
||||||
query(Conn, SQL, Params) ->
|
query(Conn, SQL, Params) ->
|
||||||
epgsql:equery(Conn, SQL, Params).
|
case epgsql:equery(Conn, SQL, Params) of
|
||||||
|
{error, sync_required} = Res ->
|
||||||
|
ok = epgsql:sync(Conn),
|
||||||
|
Res;
|
||||||
|
Res ->
|
||||||
|
Res
|
||||||
|
end.
|
||||||
|
|
||||||
prepared_query(Conn, Name, Params) ->
|
prepared_query(Conn, Name, Params) ->
|
||||||
epgsql:prepared_query2(Conn, Name, Params).
|
case epgsql:prepared_query2(Conn, Name, Params) of
|
||||||
|
{error, sync_required} = Res ->
|
||||||
|
ok = epgsql:sync(Conn),
|
||||||
|
Res;
|
||||||
|
Res ->
|
||||||
|
Res
|
||||||
|
end.
|
||||||
|
|
||||||
execute_batch(Conn, Statement, Params) ->
|
execute_batch(Conn, Statement, Params) ->
|
||||||
epgsql:execute_batch(Conn, Statement, Params).
|
case epgsql:execute_batch(Conn, Statement, Params) of
|
||||||
|
{error, sync_required} = Res ->
|
||||||
|
ok = epgsql:sync(Conn),
|
||||||
|
Res;
|
||||||
|
Res ->
|
||||||
|
Res
|
||||||
|
end.
|
||||||
|
|
||||||
conn_opts(Opts) ->
|
conn_opts(Opts) ->
|
||||||
conn_opts(Opts, []).
|
conn_opts(Opts, []).
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_gateway, [
|
{application, emqx_gateway, [
|
||||||
{description, "The Gateway management application"},
|
{description, "The Gateway management application"},
|
||||||
{vsn, "0.1.21"},
|
{vsn, "0.1.22"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_gateway_app, []}},
|
{mod, {emqx_gateway_app, []}},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},
|
||||||
|
|
|
@ -172,7 +172,7 @@ t_authz_cache(_) ->
|
||||||
|
|
||||||
{ok, C} = emqtt:start_link(#{clientid => ClientId}),
|
{ok, C} = emqtt:start_link(#{clientid => ClientId}),
|
||||||
{ok, _} = emqtt:connect(C),
|
{ok, _} = emqtt:connect(C),
|
||||||
{ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 0),
|
{ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 1),
|
||||||
|
|
||||||
ClientAuthzCachePath = emqx_mgmt_api_test_util:api_path([
|
ClientAuthzCachePath = emqx_mgmt_api_test_util:api_path([
|
||||||
"clients",
|
"clients",
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_modules, [
|
{application, emqx_modules, [
|
||||||
{description, "EMQX Modules"},
|
{description, "EMQX Modules"},
|
||||||
{vsn, "5.0.18"},
|
{vsn, "5.0.19"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
{applications, [kernel, stdlib, emqx, emqx_ctl]},
|
||||||
{mod, {emqx_modules_app, []}},
|
{mod, {emqx_modules_app, []}},
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
-define(MAX_RULES_LIMIT, 20).
|
-define(MAX_RULES_LIMIT, 20).
|
||||||
|
|
||||||
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
-define(EXCEED_LIMIT, 'EXCEED_LIMIT').
|
||||||
|
-define(BAD_REQUEST, 'BAD_REQUEST').
|
||||||
|
|
||||||
api_spec() ->
|
api_spec() ->
|
||||||
emqx_dashboard_swagger:spec(?MODULE).
|
emqx_dashboard_swagger:spec(?MODULE).
|
||||||
|
@ -62,6 +63,10 @@ schema("/mqtt/topic_rewrite") ->
|
||||||
hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
|
hoconsc:array(hoconsc:ref(emqx_modules_schema, "rewrite")),
|
||||||
#{desc => ?DESC(update_topic_rewrite_api)}
|
#{desc => ?DESC(update_topic_rewrite_api)}
|
||||||
),
|
),
|
||||||
|
400 => emqx_dashboard_swagger:error_codes(
|
||||||
|
[?BAD_REQUEST],
|
||||||
|
?DESC(update_topic_rewrite_api_response400)
|
||||||
|
),
|
||||||
413 => emqx_dashboard_swagger:error_codes(
|
413 => emqx_dashboard_swagger:error_codes(
|
||||||
[?EXCEED_LIMIT],
|
[?EXCEED_LIMIT],
|
||||||
?DESC(update_topic_rewrite_api_response413)
|
?DESC(update_topic_rewrite_api_response413)
|
||||||
|
@ -75,11 +80,30 @@ topic_rewrite(get, _Params) ->
|
||||||
topic_rewrite(put, #{body := Body}) ->
|
topic_rewrite(put, #{body := Body}) ->
|
||||||
case length(Body) < ?MAX_RULES_LIMIT of
|
case length(Body) < ?MAX_RULES_LIMIT of
|
||||||
true ->
|
true ->
|
||||||
|
try
|
||||||
ok = emqx_rewrite:update(Body),
|
ok = emqx_rewrite:update(Body),
|
||||||
{200, emqx_rewrite:list()};
|
{200, emqx_rewrite:list()}
|
||||||
|
catch
|
||||||
|
throw:#{
|
||||||
|
kind := validation_error,
|
||||||
|
reason := #{
|
||||||
|
msg := "cannot_use_wildcard_for_destination_topic",
|
||||||
|
invalid_topics := InvalidTopics
|
||||||
|
}
|
||||||
|
} ->
|
||||||
|
Message = get_invalid_wildcard_topic_msg(InvalidTopics),
|
||||||
|
{400, #{code => ?BAD_REQUEST, message => Message}}
|
||||||
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
Message = iolist_to_binary(
|
Message = iolist_to_binary(
|
||||||
io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])
|
io_lib:format("Max rewrite rules count is ~p", [?MAX_RULES_LIMIT])
|
||||||
),
|
),
|
||||||
{413, #{code => ?EXCEED_LIMIT, message => Message}}
|
{413, #{code => ?EXCEED_LIMIT, message => Message}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
get_invalid_wildcard_topic_msg(InvalidTopics) ->
|
||||||
|
iolist_to_binary(
|
||||||
|
io_lib:format("Cannot use wildcard for destination topic. Invalid topics: ~p", [
|
||||||
|
InvalidTopics
|
||||||
|
])
|
||||||
|
).
|
||||||
|
|
|
@ -130,7 +130,7 @@ t_mqtt_topic_rewrite_wildcard(_) ->
|
||||||
lists:foreach(
|
lists:foreach(
|
||||||
fun(Rule) ->
|
fun(Rule) ->
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
{ok, 500, _},
|
{ok, 400, _},
|
||||||
request(
|
request(
|
||||||
put,
|
put,
|
||||||
uri(["mqtt", "topic_rewrite"]),
|
uri(["mqtt", "topic_rewrite"]),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_oracle, [
|
{application, emqx_oracle, [
|
||||||
{description, "EMQX Enterprise Oracle Database Connector"},
|
{description, "EMQX Enterprise Oracle Database Connector"},
|
||||||
{vsn, "0.1.4"},
|
{vsn, "0.1.5"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -98,7 +98,7 @@ on_start(
|
||||||
{password, jamdb_secret:wrap(maps:get(password, Config, ""))},
|
{password, jamdb_secret:wrap(maps:get(password, Config, ""))},
|
||||||
{sid, emqx_utils_conv:str(Sid)},
|
{sid, emqx_utils_conv:str(Sid)},
|
||||||
{service_name, ServiceName},
|
{service_name, ServiceName},
|
||||||
{pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
|
{pool_size, maps:get(pool_size, Config, ?DEFAULT_POOL_SIZE)},
|
||||||
{timeout, ?OPT_TIMEOUT},
|
{timeout, ?OPT_TIMEOUT},
|
||||||
{app_name, "EMQX Data To Oracle Database Action"}
|
{app_name, "EMQX Data To Oracle Database Action"}
|
||||||
],
|
],
|
||||||
|
@ -433,9 +433,32 @@ check_if_table_exists(Conn, SQL, Tokens0) ->
|
||||||
case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
|
case jamdb_oracle:sql_query(Conn, {SqlQuery, Params}) of
|
||||||
{ok, [{proc_result, 0, _Description}]} ->
|
{ok, [{proc_result, 0, _Description}]} ->
|
||||||
ok;
|
ok;
|
||||||
{ok, [{proc_result, 6550, _Description}]} ->
|
{ok, [{proc_result, 942, _Description}]} ->
|
||||||
%% Target table is not created
|
%% Target table is not created
|
||||||
{error, undefined_table};
|
{error, undefined_table};
|
||||||
|
{ok, [{proc_result, _, Description}]} ->
|
||||||
|
% only the last result is returned, so we need to check on description if it
|
||||||
|
% contains the "Table doesn't exist" error as it can not be the last one.
|
||||||
|
% (for instance, the ORA-06550 can be the result value when table does not exist)
|
||||||
|
ErrorCodes =
|
||||||
|
case re:run(Description, <<"(ORA-[0-9]+)">>, [global, {capture, first, binary}]) of
|
||||||
|
{match, OraCodes} -> OraCodes;
|
||||||
|
_ -> []
|
||||||
|
end,
|
||||||
|
OraMap = maps:from_keys([ErrorCode || [ErrorCode] <- ErrorCodes], true),
|
||||||
|
case OraMap of
|
||||||
|
_ when is_map_key(<<"ORA-00942">>, OraMap) ->
|
||||||
|
% ORA-00942: table or view does not exist
|
||||||
|
{error, undefined_table};
|
||||||
|
_ when is_map_key(<<"ORA-00932">>, OraMap) ->
|
||||||
|
% ORA-00932: inconsistent datatypes
|
||||||
|
% There is a some type inconsistency with table definition but
|
||||||
|
% table does exist. Probably this inconsistency was caused by
|
||||||
|
% token discarding in this test query.
|
||||||
|
ok;
|
||||||
|
_ ->
|
||||||
|
{error, Description}
|
||||||
|
end;
|
||||||
Reason ->
|
Reason ->
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
|
@ -24,7 +24,11 @@
|
||||||
-type callback_mode() :: always_sync | async_if_possible.
|
-type callback_mode() :: always_sync | async_if_possible.
|
||||||
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
|
-type query_mode() :: simple_sync | simple_async | sync | async | no_queries.
|
||||||
-type result() :: term().
|
-type result() :: term().
|
||||||
-type reply_fun() :: {fun((result(), Args :: term()) -> any()), Args :: term()} | undefined.
|
-type reply_fun() ::
|
||||||
|
{fun((result(), Args :: term()) -> any()), Args :: term()}
|
||||||
|
| {fun((result(), Args :: term()) -> any()), Args :: term(), reply_context()}
|
||||||
|
| undefined.
|
||||||
|
-type reply_context() :: #{reply_dropped => boolean()}.
|
||||||
-type query_opts() :: #{
|
-type query_opts() :: #{
|
||||||
%% The key used for picking a resource worker
|
%% The key used for picking a resource worker
|
||||||
pick_key => term(),
|
pick_key => term(),
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%% -*- mode: erlang -*-
|
%% -*- mode: erlang -*-
|
||||||
{application, emqx_resource, [
|
{application, emqx_resource, [
|
||||||
{description, "Manager for all external resources"},
|
{description, "Manager for all external resources"},
|
||||||
{vsn, "0.1.20"},
|
{vsn, "0.1.21"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{mod, {emqx_resource_app, []}},
|
{mod, {emqx_resource_app, []}},
|
||||||
{applications, [
|
{applications, [
|
||||||
|
|
|
@ -366,6 +366,7 @@ resume_from_blocked(Data) ->
|
||||||
true -> #{dropped_expired => length(Batch)};
|
true -> #{dropped_expired => length(Batch)};
|
||||||
false -> #{}
|
false -> #{}
|
||||||
end,
|
end,
|
||||||
|
batch_reply_dropped(Batch, {error, request_expired}),
|
||||||
NData = aggregate_counters(Data, Counters),
|
NData = aggregate_counters(Data, Counters),
|
||||||
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
?tp(buffer_worker_retry_expired, #{expired => Batch}),
|
||||||
resume_from_blocked(NData);
|
resume_from_blocked(NData);
|
||||||
|
@ -378,6 +379,7 @@ resume_from_blocked(Data) ->
|
||||||
{batch, Ref, NotExpired, Expired} ->
|
{batch, Ref, NotExpired, Expired} ->
|
||||||
NumExpired = length(Expired),
|
NumExpired = length(Expired),
|
||||||
ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
|
ok = update_inflight_item(InflightTID, Ref, NotExpired, NumExpired),
|
||||||
|
batch_reply_dropped(Expired, {error, request_expired}),
|
||||||
NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
|
NData = aggregate_counters(Data, #{dropped_expired => NumExpired}),
|
||||||
?tp(buffer_worker_retry_expired, #{expired => Expired}),
|
?tp(buffer_worker_retry_expired, #{expired => Expired}),
|
||||||
%% We retry msgs in inflight window sync, as if we send them
|
%% We retry msgs in inflight window sync, as if we send them
|
||||||
|
@ -484,6 +486,9 @@ do_reply_caller({F, Args}, {async_return, Result}) ->
|
||||||
%% decision has to be made by the caller
|
%% decision has to be made by the caller
|
||||||
do_reply_caller({F, Args}, Result);
|
do_reply_caller({F, Args}, Result);
|
||||||
do_reply_caller({F, Args}, Result) when is_function(F) ->
|
do_reply_caller({F, Args}, Result) when is_function(F) ->
|
||||||
|
_ = erlang:apply(F, Args ++ [Result]),
|
||||||
|
ok;
|
||||||
|
do_reply_caller({F, Args, _Context}, Result) when is_function(F) ->
|
||||||
_ = erlang:apply(F, Args ++ [Result]),
|
_ = erlang:apply(F, Args ++ [Result]),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
@ -537,11 +542,13 @@ flush(Data0) ->
|
||||||
{[], _AllExpired} ->
|
{[], _AllExpired} ->
|
||||||
ok = replayq:ack(Q1, QAckRef),
|
ok = replayq:ack(Q1, QAckRef),
|
||||||
NumExpired = length(Batch),
|
NumExpired = length(Batch),
|
||||||
|
batch_reply_dropped(Batch, {error, request_expired}),
|
||||||
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
||||||
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
|
?tp(buffer_worker_flush_all_expired, #{batch => Batch}),
|
||||||
flush(Data3);
|
flush(Data3);
|
||||||
{NotExpired, Expired} ->
|
{NotExpired, Expired} ->
|
||||||
NumExpired = length(Expired),
|
NumExpired = length(Expired),
|
||||||
|
batch_reply_dropped(Expired, {error, request_expired}),
|
||||||
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
Data3 = aggregate_counters(Data2, #{dropped_expired => NumExpired}),
|
||||||
IsBatch = (BatchSize > 1),
|
IsBatch = (BatchSize > 1),
|
||||||
%% We *must* use the new queue, because we currently can't
|
%% We *must* use the new queue, because we currently can't
|
||||||
|
@ -809,6 +816,28 @@ reply_caller_defer_metrics(Id, ?REPLY(ReplyTo, HasBeenSent, Result), QueryOpts)
|
||||||
end,
|
end,
|
||||||
{ShouldAck, PostFn, DeltaCounters}.
|
{ShouldAck, PostFn, DeltaCounters}.
|
||||||
|
|
||||||
|
%% This is basically used only by rule actions. To avoid rule action metrics from
|
||||||
|
%% becoming inconsistent when we drop messages, we need a way to signal rule engine that
|
||||||
|
%% this action has reached a conclusion.
|
||||||
|
-spec reply_dropped(reply_fun(), {error, late_reply | request_expired}) -> ok.
|
||||||
|
reply_dropped(_ReplyTo = {Fn, Args, #{reply_dropped := true}}, Result) when
|
||||||
|
is_function(Fn), is_list(Args)
|
||||||
|
->
|
||||||
|
%% We want to avoid bumping metrics inside the buffer worker, since it's costly.
|
||||||
|
emqx_pool:async_submit(Fn, Args ++ [Result]),
|
||||||
|
ok;
|
||||||
|
reply_dropped(_ReplyTo, _Result) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
-spec batch_reply_dropped([queue_query()], {error, late_reply | request_expired}) -> ok.
|
||||||
|
batch_reply_dropped(Batch, Result) ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(?QUERY(ReplyTo, _CoreReq, _HasBeenSent, _ExpireAt)) ->
|
||||||
|
reply_dropped(ReplyTo, Result)
|
||||||
|
end,
|
||||||
|
Batch
|
||||||
|
).
|
||||||
|
|
||||||
%% This is only called by `simple_{,a}sync_query', so we can bump the
|
%% This is only called by `simple_{,a}sync_query', so we can bump the
|
||||||
%% counters here.
|
%% counters here.
|
||||||
handle_query_result(Id, Result, HasBeenSent) ->
|
handle_query_result(Id, Result, HasBeenSent) ->
|
||||||
|
@ -1164,7 +1193,7 @@ handle_async_reply1(
|
||||||
inflight_tid := InflightTID,
|
inflight_tid := InflightTID,
|
||||||
resource_id := Id,
|
resource_id := Id,
|
||||||
buffer_worker := BufferWorkerPid,
|
buffer_worker := BufferWorkerPid,
|
||||||
min_query := ?QUERY(_, _, _, ExpireAt) = _Query
|
min_query := ?QUERY(ReplyTo, _, _, ExpireAt) = _Query
|
||||||
} = ReplyContext,
|
} = ReplyContext,
|
||||||
Result
|
Result
|
||||||
) ->
|
) ->
|
||||||
|
@ -1178,7 +1207,11 @@ handle_async_reply1(
|
||||||
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
IsAcked = ack_inflight(InflightTID, Ref, BufferWorkerPid),
|
||||||
%% evalutate metrics call here since we're not inside
|
%% evalutate metrics call here since we're not inside
|
||||||
%% buffer worker
|
%% buffer worker
|
||||||
IsAcked andalso emqx_resource_metrics:late_reply_inc(Id),
|
IsAcked andalso
|
||||||
|
begin
|
||||||
|
emqx_resource_metrics:late_reply_inc(Id),
|
||||||
|
reply_dropped(ReplyTo, {error, late_reply})
|
||||||
|
end,
|
||||||
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
?tp(handle_async_reply_expired, #{expired => [_Query]}),
|
||||||
ok;
|
ok;
|
||||||
false ->
|
false ->
|
||||||
|
@ -1292,6 +1325,7 @@ handle_async_batch_reply2([Inflight], ReplyContext, Result, Now) ->
|
||||||
%% evalutate metrics call here since we're not inside buffer
|
%% evalutate metrics call here since we're not inside buffer
|
||||||
%% worker
|
%% worker
|
||||||
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
|
emqx_resource_metrics:late_reply_inc(Id, NumExpired),
|
||||||
|
batch_reply_dropped(RealExpired, {error, late_reply}),
|
||||||
case RealNotExpired of
|
case RealNotExpired of
|
||||||
[] ->
|
[] ->
|
||||||
%% all expired, no need to update back the inflight batch
|
%% all expired, no need to update back the inflight batch
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_rule_engine, [
|
{application, emqx_rule_engine, [
|
||||||
{description, "EMQX Rule Engine"},
|
{description, "EMQX Rule Engine"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.21"},
|
{vsn, "5.0.22"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
||||||
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]},
|
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl, uuid]},
|
||||||
|
|
|
@ -350,7 +350,7 @@ do_handle_action(RuleId, {bridge, BridgeType, BridgeName, ResId}, Selected, _Env
|
||||||
"bridge_action",
|
"bridge_action",
|
||||||
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
#{bridge_id => emqx_bridge_resource:bridge_id(BridgeType, BridgeName)}
|
||||||
),
|
),
|
||||||
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId]},
|
ReplyTo = {fun ?MODULE:inc_action_metrics/2, [RuleId], #{reply_dropped => true}},
|
||||||
case
|
case
|
||||||
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
emqx_bridge:send_message(BridgeType, BridgeName, ResId, Selected, #{reply_to => ReplyTo})
|
||||||
of
|
of
|
||||||
|
@ -378,9 +378,9 @@ eval({Op, _} = Exp, Context) when is_list(Context) andalso (Op == path orelse Op
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{payload := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, maybe_decode_payload(Payload));
|
||||||
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
eval({path, [{key, <<"payload">>} | Path]}, #{<<"payload">> := Payload}) ->
|
||||||
nested_get({path, Path}, may_decode_payload(Payload));
|
nested_get({path, Path}, maybe_decode_payload(Payload));
|
||||||
eval({path, _} = Path, Columns) ->
|
eval({path, _} = Path, Columns) ->
|
||||||
nested_get(Path, Columns);
|
nested_get(Path, Columns);
|
||||||
eval({range, {Begin, End}}, _Columns) ->
|
eval({range, {Begin, End}}, _Columns) ->
|
||||||
|
@ -410,6 +410,16 @@ eval({'case', CaseOn, CaseClauses, ElseClauses}, Columns) ->
|
||||||
eval({'fun', {_, Name}, Args}, Columns) ->
|
eval({'fun', {_, Name}, Args}, Columns) ->
|
||||||
apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
|
apply_func(Name, [eval(Arg, Columns) || Arg <- Args], Columns).
|
||||||
|
|
||||||
|
%% the payload maybe is JSON data, decode it to a `map` first for nested put
|
||||||
|
ensure_decoded_payload({path, [{key, payload} | _]}, #{payload := Payload} = Columns) ->
|
||||||
|
Columns#{payload => maybe_decode_payload(Payload)};
|
||||||
|
ensure_decoded_payload(
|
||||||
|
{path, [{key, <<"payload">>} | _]}, #{<<"payload">> := Payload} = Columns
|
||||||
|
) ->
|
||||||
|
Columns#{<<"payload">> => maybe_decode_payload(Payload)};
|
||||||
|
ensure_decoded_payload(_, Columns) ->
|
||||||
|
Columns.
|
||||||
|
|
||||||
alias({var, Var}, _Columns) ->
|
alias({var, Var}, _Columns) ->
|
||||||
{var, Var};
|
{var, Var};
|
||||||
alias({const, Val}, _Columns) when is_binary(Val) ->
|
alias({const, Val}, _Columns) when is_binary(Val) ->
|
||||||
|
@ -497,12 +507,12 @@ add_metadata(Columns, Metadata) when is_map(Columns), is_map(Metadata) ->
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
%% Internal Functions
|
%% Internal Functions
|
||||||
%%------------------------------------------------------------------------------
|
%%------------------------------------------------------------------------------
|
||||||
may_decode_payload(Payload) when is_binary(Payload) ->
|
maybe_decode_payload(Payload) when is_binary(Payload) ->
|
||||||
case get_cached_payload() of
|
case get_cached_payload() of
|
||||||
undefined -> safe_decode_and_cache(Payload);
|
undefined -> safe_decode_and_cache(Payload);
|
||||||
DecodedP -> DecodedP
|
DecodedP -> DecodedP
|
||||||
end;
|
end;
|
||||||
may_decode_payload(Payload) ->
|
maybe_decode_payload(Payload) ->
|
||||||
Payload.
|
Payload.
|
||||||
|
|
||||||
get_cached_payload() ->
|
get_cached_payload() ->
|
||||||
|
@ -522,7 +532,8 @@ safe_decode_and_cache(MaybeJson) ->
|
||||||
ensure_list(List) when is_list(List) -> List;
|
ensure_list(List) when is_list(List) -> List;
|
||||||
ensure_list(_NotList) -> [].
|
ensure_list(_NotList) -> [].
|
||||||
|
|
||||||
nested_put(Alias, Val, Columns) ->
|
nested_put(Alias, Val, Columns0) ->
|
||||||
|
Columns = ensure_decoded_payload(Alias, Columns0),
|
||||||
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
emqx_rule_maps:nested_put(Alias, Val, Columns).
|
||||||
|
|
||||||
inc_action_metrics(RuleId, Result) ->
|
inc_action_metrics(RuleId, Result) ->
|
||||||
|
|
|
@ -104,7 +104,8 @@ groups() ->
|
||||||
t_sqlparse_true_false,
|
t_sqlparse_true_false,
|
||||||
t_sqlparse_undefined_variable,
|
t_sqlparse_undefined_variable,
|
||||||
t_sqlparse_new_map,
|
t_sqlparse_new_map,
|
||||||
t_sqlparse_invalid_json
|
t_sqlparse_invalid_json,
|
||||||
|
t_sqlselect_as_put
|
||||||
]},
|
]},
|
||||||
{events, [], [
|
{events, [], [
|
||||||
t_events,
|
t_events,
|
||||||
|
@ -1587,6 +1588,45 @@ t_sqlselect_message_publish_event_keep_original_props_2(_Config) ->
|
||||||
emqtt:stop(Client1),
|
emqtt:stop(Client1),
|
||||||
delete_rule(TopicRule).
|
delete_rule(TopicRule).
|
||||||
|
|
||||||
|
t_sqlselect_as_put(_Config) ->
|
||||||
|
%% Verify SELECT with 'AS' to update the payload
|
||||||
|
Sql =
|
||||||
|
"select payload, "
|
||||||
|
"'STEVE' as payload.data[1].name "
|
||||||
|
"from \"t/#\" ",
|
||||||
|
PayloadMap = #{
|
||||||
|
<<"f1">> => <<"f1">>,
|
||||||
|
<<"f2">> => <<"f2">>,
|
||||||
|
<<"data">> => [
|
||||||
|
#{<<"name">> => <<"n1">>, <<"idx">> => 1},
|
||||||
|
#{<<"name">> => <<"n2">>, <<"idx">> => 2}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
PayloadBin = emqx_utils_json:encode(PayloadMap),
|
||||||
|
SqlResult = emqx_rule_sqltester:test(
|
||||||
|
#{
|
||||||
|
sql => Sql,
|
||||||
|
context =>
|
||||||
|
#{
|
||||||
|
payload => PayloadBin,
|
||||||
|
topic => <<"t/a">>
|
||||||
|
}
|
||||||
|
}
|
||||||
|
),
|
||||||
|
?assertMatch({ok, #{<<"payload">> := _}}, SqlResult),
|
||||||
|
{ok, #{<<"payload">> := PayloadMap2}} = SqlResult,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"f1">> := <<"f1">>,
|
||||||
|
<<"f2">> := <<"f2">>,
|
||||||
|
<<"data">> := [
|
||||||
|
#{<<"name">> := <<"STEVE">>, <<"idx">> := 1},
|
||||||
|
#{<<"name">> := <<"n2">>, <<"idx">> := 2}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
PayloadMap2
|
||||||
|
).
|
||||||
|
|
||||||
t_sqlparse_event_1(_Config) ->
|
t_sqlparse_event_1(_Config) ->
|
||||||
Sql =
|
Sql =
|
||||||
"select topic as tp "
|
"select topic as tp "
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed rule action metrics inconsistency where dropped requests were not accounted for.
|
|
@ -0,0 +1 @@
|
||||||
|
Fix HTTP API error when a publish topic rewrite rule targets a topic with wildcards. Now it returns error 400 (Bad Match) instead of error 500 (Internal Error).
|
|
@ -0,0 +1,127 @@
|
||||||
|
## e5.1.1
|
||||||
|
|
||||||
|
## Enhancements
|
||||||
|
|
||||||
|
- [#10667](https://github.com/emqx/emqx/pull/10667) The MongoDB connector and bridge have been refactored into a separate app to improve the code structure.
|
||||||
|
- [#11115](https://github.com/emqx/emqx/pull/11115) Added info logs to indicate when buffered messages are dropped due to time-to-live (TTL) expiration.
|
||||||
|
- [#11133](https://github.com/emqx/emqx/pull/11133) Renamed `deliver_rate` to `delivery_rate` in the configuration of `retainer`, while being compatible with the previous `deliver_rate`.
|
||||||
|
- [#11137](https://github.com/emqx/emqx/pull/11137) Refactored the Dashboard listener configuration to use a nested `ssl_options` field for SSL settings.
|
||||||
|
- [#11138](https://github.com/emqx/emqx/pull/11138) Changed the default value of k8s `api_server` from `http://127.0.0.1:9091` to `https://kubernetes.default.svc:443`.
|
||||||
|
- `emqx_ctl conf show cluster` no longer displays irrelevant configuration items when `discovery_strategy=static`.
|
||||||
|
Configuration information related to `etcd/k8s/dns` will not be shown.
|
||||||
|
- Removed `zones `(deprecated config key) from `emqx_ctl conf show_keys`.
|
||||||
|
- [#11165](https://github.com/emqx/emqx/pull/11165) Removed the `/configs/limiter` API from `swagger.json`. Only the API documentation was removed,
|
||||||
|
and the `/configs/limiter` API functionalities remain unchanged.
|
||||||
|
- [#11166](https://github.com/emqx/emqx/pull/11166) Added 3 random SQL functions to the rule engine:
|
||||||
|
- `random()`: Generates a random number between 0 and 1 (0.0 =< X < 1.0).
|
||||||
|
- `uuid_v4()`: Generates a random UUID (version 4) string.
|
||||||
|
- `uuid_v4_no_hyphen()`: Generates a random UUID (version 4) string without hyphens.
|
||||||
|
- [#11180](https://github.com/emqx/emqx/pull/11180) Added a new configuration API `/configs` (GET/PUT) that supports reloading the HOCON format configuration file.
|
||||||
|
- [#11226](https://github.com/emqx/emqx/pull/11226) Unified the listener switch to `enable`, while being compatible with the previous `enabled`.
|
||||||
|
- [#11249](https://github.com/emqx/emqx/pull/11249) Added `/license/setting` REST API endpoint to read and update licensed connections usage alarm watermark.
|
||||||
|
- [#11251](https://github.com/emqx/emqx/pull/11251) Added the `/cluster/topology` REST API endpoint:
|
||||||
|
A `GET` request to this endpoint returns the cluster topology, showing connections between RLOG core and replicant nodes.
|
||||||
|
- [#11253](https://github.com/emqx/emqx/pull/11253) The Webhook/HTTP bridge has been refactored into its own Erlang application. This allows for more flexibility in the future and allows the bridge to be run as a standalone application.
|
||||||
|
- [#11079](https://github.com/emqx/emqx/pull/11079) Added support for custom headers in messages for Kafka bridge producer mode.
|
||||||
|
- [#11132](https://github.com/emqx/emqx/pull/11132) Added support for MQTT action authorization based on QoS level and Retain flag values.
|
||||||
|
Now, EMQX can verify whether clients have the permission to publish/subscribe using specific QoS levels, and whether they have the permission to publish retained messages.
|
||||||
|
- [#11207](https://github.com/emqx/emqx/pull/11207) Updated the driver versions of multiple data bridges to enhance security and ensure that sensitive data will not be leaked. This includes:
|
||||||
|
- TDengine
|
||||||
|
- MongoDB
|
||||||
|
- MySQL
|
||||||
|
- Clickhouse
|
||||||
|
- [#11241](https://github.com/emqx/emqx/pull/11241) Schema Registry has been refactored into its own Erlang application. This allows for more flexibility in the future.
|
||||||
|
- [#11020](https://github.com/emqx/emqx/pull/11020) Upgraded emqtt dependency to prevent sensitive data leakage in the debug log.
|
||||||
|
- [#11135](https://github.com/emqx/emqx/pull/11135) Improved time offset parser in rule engine and return uniform error codes.
|
||||||
|
- [#11236](https://github.com/emqx/emqx/pull/11236) Improved the speed of clients querying in REST API `/clients` endpoint with default parameters.
|
||||||
|
|
||||||
|
## Bug Fixes
|
||||||
|
|
||||||
|
- [#11004](https://github.com/emqx/emqx/pull/11004) Wildcards are no longer allowed for the destination topic in topic rewrite.
|
||||||
|
- [#11026](https://github.com/emqx/emqx/pull/11026) Addressed an inconsistency in the usage of `div` and `mod` operations within the rule engine. Previously, the `div'` operation could only be used as an infix operation, and `mod` could only be applied through a function call. Now, both `div` and `mod` can be used via function call syntax and infix syntax.
|
||||||
|
- [#11037](https://github.com/emqx/emqx/pull/11037) When starting an HTTP connector, EMQX now returns a descriptive error in case the system is unable to connect to the remote target system.
|
||||||
|
- [#11039](https://github.com/emqx/emqx/pull/11039) Fixed database number validation for Redis connector. Previously, negative numbers were accepted as valid database numbers.
|
||||||
|
- [#11074](https://github.com/emqx/emqx/pull/11074) Fixed a bug to adhere to Protocol spec MQTT-5.0 [MQTT-3.8.3-4].
|
||||||
|
- [#11077](https://github.com/emqx/emqx/pull/11077) Fixed a crash when updating listener binding with a non-integer port.
|
||||||
|
- [#11094](https://github.com/emqx/emqx/pull/11094) Fixed an issue where connection errors in Kafka Producer would not be reported when reconnecting the bridge.
|
||||||
|
- [#11103](https://github.com/emqx/emqx/pull/11103) Updated `erlcloud` dependency.
|
||||||
|
- [#11106](https://github.com/emqx/emqx/pull/11106) Added validation for the maximum number of `worker_pool_size` of a bridge resource.
|
||||||
|
Now the maximum amount is 1024 to avoid large memory consumption from an unreasonable number of workers.
|
||||||
|
- [#11118](https://github.com/emqx/emqx/pull/11118) Ensured that validation errors in REST API responses are slightly less confusing. Now, if there are out-of-range errors, they will be presented as `{"value": 42, "reason": {"expected": "1..10"}, ...}`, replacing the previous usage of `expected_type` with `expected`.
|
||||||
|
- [#11126](https://github.com/emqx/emqx/pull/11126) Rule metrics for async mode bridges will set failure counters correctly now.
|
||||||
|
- [#11134](https://github.com/emqx/emqx/pull/11134) Fixed the value of the uppercase `authorization` header not being obfuscated in the log.
|
||||||
|
- [#11139](https://github.com/emqx/emqx/pull/11139) The Redis connector has been refactored into its own Erlang application to improve the code structure.
|
||||||
|
- [#11145](https://github.com/emqx/emqx/pull/11145) Added several fixes and improvements in Ekka and Mria.
|
||||||
|
Ekka:
|
||||||
|
- Improved cluster discovery log messages to consistently describe actual events
|
||||||
|
[Ekka PR](https://github.com/emqx/ekka/pull/204).
|
||||||
|
- Removed deprecated cluster auto-clean configuration parameter (it has been moved to Mria)
|
||||||
|
[Ekka PR](https://github.com/emqx/ekka/pull/203).
|
||||||
|
Mria:
|
||||||
|
- Ping now only runs on replicant nodes. Previously, `mria_lb` was trying to ping both stopped and running
|
||||||
|
replicant nodes, which could result in timeout errors.
|
||||||
|
[Mria PR](https://github.com/emqx/mria/pull/146)
|
||||||
|
- Used `null_copies` storage when copying `$mria_rlog_sync` table.
|
||||||
|
This fix has no effect on EMQX for now, as `$mria_rlog_sync` is only used in `mria:sync_transaction/2,3,4`,
|
||||||
|
which is not utilized by EMQX.
|
||||||
|
[Mria PR](https://github.com/emqx/mria/pull/144)
|
||||||
|
- [#11148](https://github.com/emqx/emqx/pull/11148) Fixed an issue when nodes tried to synchronize configuration update operations to a node which has already left the cluster.
|
||||||
|
- [#11150](https://github.com/emqx/emqx/pull/11150) Wait for Mria table when emqx_psk app is being started to ensure that PSK data is synced to replicant nodes even if they don't have init PSK file.
|
||||||
|
- [#11151](https://github.com/emqx/emqx/pull/11151) The MySQL connector has been refactored into its own Erlang application to improve the code structure.
|
||||||
|
- [#11158](https://github.com/emqx/emqx/pull/11158) Wait for Mria table when the mnesia backend of retainer starts to avoid a possible error of the retainer when joining a cluster.
|
||||||
|
- [#11162](https://github.com/emqx/emqx/pull/11162) Fixed an issue in webhook bridge where, in async query mode, HTTP status codes like 4XX and 5XX would be treated as successes in the bridge metrics.
|
||||||
|
- [#11164](https://github.com/emqx/emqx/pull/11164) Reintroduced support for nested (i.e.: `${payload.a.b.c}`) placeholders for extracting data from rule action messages without the need for calling `json_decode(payload)` first.
|
||||||
|
- [#11172](https://github.com/emqx/emqx/pull/11172) Fixed the `payload` field in rule engine SQL being duplicated in the below situations:
|
||||||
|
- When using a `foreach` sentence without the `as` sub-expression and selecting all fields (using the `*` or omitting the `do` sub-expression).
|
||||||
|
For example:
|
||||||
|
`FOREACH payload.sensors FROM "t/#"`
|
||||||
|
- When selecting the `payload` field and all fields.
|
||||||
|
For example:
|
||||||
|
`SELECT payload.sensors, * FROM "t/#"`
|
||||||
|
- [#11174](https://github.com/emqx/emqx/pull/11174) Fixed the encoding of the `server` key coming from an ingress MQTT bridge.
|
||||||
|
Before the fix, it was encoded as a list of integers corresponding to the ASCII characters of the server string.
|
||||||
|
- [#11184](https://github.com/emqx/emqx/pull/11184) Config value for `mqtt.max_packet_size` now has a max value of 256MB as defined by the protocol.
|
||||||
|
- [#11192](https://github.com/emqx/emqx/pull/11192) Fixed an issue with producing invalid HOCON file when an atom type was used. Also removed unnecessary `"` around keys and latin1 strings from HOCON file.
|
||||||
|
- [#11195](https://github.com/emqx/emqx/pull/11195) Fixed an issue where the REST API could create duplicate subscriptions for specified clients of the Stomp gateway.
|
||||||
|
- [#11206](https://github.com/emqx/emqx/pull/11206) Made the `username` and `password` params of CoAP client optional in connection mode.
|
||||||
|
- [#11208](https://github.com/emqx/emqx/pull/11208) Fixed the issue of abnormal data statistics for LwM2M clients.
|
||||||
|
- [#11211](https://github.com/emqx/emqx/pull/11211) HTTP API `DELETE` operations on non-existent resources now consistently returns `404`.
|
||||||
|
- [#11214](https://github.com/emqx/emqx/pull/11214) Fixed a bug where node configuration may fail to synchronize correctly when the node joins the cluster.
|
||||||
|
- [#11229](https://github.com/emqx/emqx/pull/11229) Fixed an issue that prevented plugins from starting/stopping after changing configuration via `emqx ctl conf load`.
|
||||||
|
- [#11237](https://github.com/emqx/emqx/pull/11237) The `headers` default value in /prometheus API should be a map instead of a list.
|
||||||
|
- [#11250](https://github.com/emqx/emqx/pull/11250) Fixed a bug when the order of MQTT packets withing a WebSocket packet will be reversed.
|
||||||
|
- [#11271](https://github.com/emqx/emqx/pull/11271) Ensured that the range of all percentage type configurations is from 0% to 100% in the REST API and configuration. For example, `sysom.os.sysmem_high_watermark=101%` is invalid now.
|
||||||
|
- [#11272](https://github.com/emqx/emqx/pull/11272) Fixed a typo in the log, where an abnormal `PUBREL` packet was mistakenly referred to as `pubrec`.
|
||||||
|
- [#11281](https://github.com/emqx/emqx/pull/11281) Restored support for the special `$queue/` shared subscription topic prefix.
|
||||||
|
- [#11294](https://github.com/emqx/emqx/pull/11294) Fixed `emqx ctl cluster join`, `leave`, and `status` commands.
|
||||||
|
- [#11306](https://github.com/emqx/emqx/pull/11306) Fixed rule action metrics inconsistency where dropped requests were not accounted for.
|
||||||
|
- [#11309](https://github.com/emqx/emqx/pull/11309) Improved startup order of EMQX applications. Simplified build scripts and improved code reuse.
|
||||||
|
- [#11322](https://github.com/emqx/emqx/pull/11322) Added support for importing additional configurations from EMQX backup file (`emqx ctl import` command):
|
||||||
|
- rule_engine (previously not imported due to the bug)
|
||||||
|
- topic_metrics (previously not implemented)
|
||||||
|
- slow_subs (previously not implemented).
|
||||||
|
- [#10645](https://github.com/emqx/emqx/pull/10645) Changed health check for Oracle Database, PostgreSQL, MySQL and Kafka Producer data bridges to ensure target table/topic exists.
|
||||||
|
- [#11107](https://github.com/emqx/emqx/pull/11107) MongoDB bridge health check now returns the failure reason.
|
||||||
|
- [#11139](https://github.com/emqx/emqx/pull/11139) The Redis bridge has been refactored into its own Erlang application to improve the code structure and to make it easier to maintain.
|
||||||
|
- [#11151](https://github.com/emqx/emqx/pull/11151) The MySQL bridge has been refactored into its own Erlang application to improve the code structure and to make it easier to maintain.
|
||||||
|
- [#11163](https://github.com/emqx/emqx/pull/11163) Hid `topology.pool_size` in MondoDB bridges and fixed it to 1 to avoid confusion.
|
||||||
|
- [#11175](https://github.com/emqx/emqx/pull/11175) Now when using a nonexistent hostname for connecting to MySQL, a 400 error is returned rather than 503 in the REST API.
|
||||||
|
- [#11198](https://github.com/emqx/emqx/pull/11198) Fixed global rebalance status evaluation on replicant nodes. Previously, `/api/v5/load_rebalance/global_status` API method could return incomplete results if handled by a replicant node.
|
||||||
|
- [#11223](https://github.com/emqx/emqx/pull/11223) In InfluxDB bridging, mixing decimals and integers in a field may lead to serialization failure in the Influx Line Protocol, resulting in the inability to write to the InfluxDB bridge (when the decimal point is 0, InfluxDB mistakenly interprets it as an integer).
|
||||||
|
See also: [InfluxDB v2.7 Line-Protocol](https://docs.influxdata.com/influxdb/v2.7/reference/syntax/line-protocol/#float).
|
||||||
|
- [#11225](https://github.com/emqx/emqx/pull/11225) The `username` field in PostgreSQL/Timescale/MatrixDB bridges configuration is now a required one.
|
||||||
|
- [#11242](https://github.com/emqx/emqx/pull/11242) Restarted emqx_ee_schema_registry when a node joins a cluster. As emqx_ee_schema_registry uses Mria tables, a node joining a cluster needs to restart this application in order to start relevant Mria shard processes, ensuring a correct behaviour in Core/Replicant mode.
|
||||||
|
- [#11266](https://github.com/emqx/emqx/pull/11266) Fixed and improved support for TDengine `insert` syntax:
|
||||||
|
1. Added support for inserting into multi-table in the template.
|
||||||
|
For example:
|
||||||
|
`insert into table_1 values (${ts}, '${id}', '${topic}')
|
||||||
|
table_2 values (${ts}, '${id}', '${topic}')`
|
||||||
|
2. Added support for mixing prefixes/suffixes and placeholders in the template.
|
||||||
|
For example:
|
||||||
|
`insert into table_${topic} values (${ts}, '${id}', '${topic}')`
|
||||||
|
Note: This is a breaking change. Previously, the values of string type were quoted automatically, but now they must be quoted explicitly.
|
||||||
|
For example:
|
||||||
|
`insert into table values (${ts}, '${a_string}')`
|
||||||
|
- [#11307](https://github.com/emqx/emqx/pull/11307) Fixed check for table existence to return a more friendly message in the Oracle bridge.
|
||||||
|
- [#11316](https://github.com/emqx/emqx/pull/11316) Fixed Pool Size value not being considered in Oracle Bridge.
|
||||||
|
- [#11326](https://github.com/emqx/emqx/pull/11326) Fixed return error checking on table validation in the Oracle bridge.
|
|
@ -1 +0,0 @@
|
||||||
Add HStreamDB bridge support, adapted to the HStreamDB `v0.15.0`.
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed Pool Size value not being considered in Oracle Bridge.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed return error checking on table validation in the Oracle bridge.
|
|
@ -0,0 +1 @@
|
||||||
|
Fixed an issue where the PostgreSQL bridge connection could crash under high message rates.
|
|
@ -37,7 +37,7 @@ local_topic.label:
|
||||||
template.desc:
|
template.desc:
|
||||||
"""Template, the default value is empty. When this value is empty the whole message will be stored in the database.<br>
|
"""Template, the default value is empty. When this value is empty the whole message will be stored in the database.<br>
|
||||||
The template can be any valid json with placeholders and make sure all keys for table are here, example:<br>
|
The template can be any valid json with placeholders and make sure all keys for table are here, example:<br>
|
||||||
{"id" : ${id}, "clientid" : ${clientid}, "data" : ${payload}}"""
|
{"id" : "${id}", "clientid" : "${clientid}", "data" : "${payload.data}"}"""
|
||||||
|
|
||||||
template.label:
|
template.label:
|
||||||
"""Template"""
|
"""Template"""
|
||||||
|
|
|
@ -15,4 +15,9 @@ update_topic_rewrite_api_response413.desc:
|
||||||
update_topic_rewrite_api_response413.label:
|
update_topic_rewrite_api_response413.label:
|
||||||
"""Rules count exceed limit"""
|
"""Rules count exceed limit"""
|
||||||
|
|
||||||
|
update_topic_rewrite_api_response400.desc:
|
||||||
|
"""Bad request"""
|
||||||
|
update_topic_rewrite_api_response400.label:
|
||||||
|
"""Bad request"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -703,7 +703,12 @@ fields_mqtt_quic_listener_minimum_mtu.label:
|
||||||
"""Minimum MTU"""
|
"""Minimum MTU"""
|
||||||
|
|
||||||
sys_msg_interval.desc:
|
sys_msg_interval.desc:
|
||||||
"""Time interval of publishing `$SYS` messages."""
|
"""Time interval for publishing following system messages:
|
||||||
|
- `$SYS/brokers`
|
||||||
|
- `$SYS/brokers/<node>/version`
|
||||||
|
- `$SYS/brokers/<node>/sysdescr`
|
||||||
|
- `$SYS/brokers/<node>/stats/<name>`
|
||||||
|
- `$SYS/brokers/<node>/metrics/<name>`"""
|
||||||
|
|
||||||
mqtt_await_rel_timeout.desc:
|
mqtt_await_rel_timeout.desc:
|
||||||
"""For client to broker QoS 2 message, the time limit for the broker to wait before the `PUBREL` message is received. The wait is aborted after timed out, meaning the packet ID is freed for new `PUBLISH` requests. Receiving a stale `PUBREL` causes a warning level log. Note, the message is delivered to subscribers before entering the wait for PUBREL."""
|
"""For client to broker QoS 2 message, the time limit for the broker to wait before the `PUBREL` message is received. The wait is aborted after timed out, meaning the packet ID is freed for new `PUBLISH` requests. Receiving a stale `PUBREL` causes a warning level log. Note, the message is delivered to subscribers before entering the wait for PUBREL."""
|
||||||
|
|
Loading…
Reference in New Issue