Merge branch 'licence-conf-update' of https://github.com/zhongwencool/emqx into licence-conf-update

This commit is contained in:
某文 2023-05-29 21:54:16 +08:00
commit a55e50f1d7
71 changed files with 660 additions and 270 deletions

View File

@ -14,7 +14,6 @@ jobs:
builder:
- 5.0-35
otp:
- 24.3.4.2-3
- 25.1.2-3
# no need to use more than 1 version of Elixir, since tests
# run using only Erlang code. This is needed just to specify

View File

@ -15,8 +15,8 @@ endif
# Dashbord version
# from https://github.com/emqx/emqx-dashboard5
export EMQX_DASHBOARD_VERSION ?= v1.2.4-1
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.7-beta.3
export EMQX_DASHBOARD_VERSION ?= v1.2.5
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.7
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
# In make 4.4+, for backward-compatibility the value from the original environment is used.

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.25").
%% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.4-alpha.2").
-define(EMQX_RELEASE_EE, "5.0.4").
%% the HTTP API version
-define(EMQX_API_VERSION, "5.0").

View File

@ -3,7 +3,7 @@
{id, "emqx"},
{description, "EMQX Core"},
% strict semver, bump manually!
{vsn, "5.0.26"},
{vsn, "5.0.27"},
{modules, []},
{registered, []},
{applications, [

View File

@ -147,7 +147,7 @@ unwrap_erpc({throw, A}) ->
{error, A};
unwrap_erpc({error, {exception, Err, _Stack}}) ->
{error, Err};
unwrap_erpc({error, {exit, Err}}) ->
unwrap_erpc({exit, Err}) ->
{error, Err};
unwrap_erpc({error, {erpc, Err}}) ->
{error, Err}.

View File

@ -106,7 +106,7 @@ get_enabled_authns() ->
AuthnTypes = lists:usort([
Type
|| #{authenticators := As} <- Chains,
#{id := Type} <- As
#{id := Type, enable := true} <- As
]),
OverriddenListeners =
lists:foldl(

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_authz, [
{description, "An OTP application"},
{vsn, "0.1.20"},
{vsn, "0.1.21"},
{registered, []},
{mod, {emqx_authz_app, []}},
{applications, [

View File

@ -417,7 +417,7 @@ do_authorize(
end.
get_enabled_authzs() ->
lists:usort([Type || #{type := Type} <- lookup()]).
lists:usort([Type || #{type := Type, enable := true} <- lookup()]).
%%--------------------------------------------------------------------
%% Internal function

View File

@ -423,8 +423,8 @@ users(get, #{query_string := QueryString}) ->
of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{error, Node, Error} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Result ->
{200, Result}
@ -459,8 +459,8 @@ clients(get, #{query_string := QueryString}) ->
of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{error, Node, Error} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Result ->
{200, Result}

View File

@ -366,7 +366,7 @@ t_get_enabled_authzs_none_enabled(_Config) ->
?assertEqual([], emqx_authz:get_enabled_authzs()).
t_get_enabled_authzs_some_enabled(_Config) ->
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4]),
{ok, _} = emqx_authz:update(?CMD_REPLACE, [?SOURCE4, ?SOURCE5#{<<"enable">> := false}]),
?assertEqual([postgresql], emqx_authz:get_enabled_authzs()).
t_subscribe_deny_disconnect_publishes_last_will_testament(_Config) ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.19"},
{vsn, "0.1.20"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -760,7 +760,14 @@ format_bridge_info([FirstBridge | _] = Bridges) ->
}).
format_bridge_metrics(Bridges) ->
NodeMetrics = collect_metrics(Bridges),
FilteredBridges = lists:filter(
fun
({_Node, Metric}) when is_map(Metric) -> true;
(_) -> false
end,
Bridges
),
NodeMetrics = collect_metrics(FilteredBridges),
#{
metrics => aggregate_metrics(NodeMetrics),
node_metrics => NodeMetrics
@ -919,9 +926,6 @@ fill_defaults(Type, RawConf) ->
pack_bridge_conf(Type, RawConf) ->
#{<<"bridges">> => #{bin(Type) => #{<<"foo">> => RawConf}}}.
%% Hide webhook's resource_opts.request_timeout from user.
filter_raw_conf(<<"webhook">>, RawConf0) ->
emqx_utils_maps:deep_remove([<<"resource_opts">>, <<"request_timeout">>], RawConf0);
filter_raw_conf(_TypeBin, RawConf) ->
RawConf.

View File

@ -57,11 +57,6 @@
(TYPE) =:= <<"kafka_consumer">> orelse ?IS_BI_DIR_BRIDGE(TYPE)
).
%% [FIXME] this has no place here, it's used in parse_confs/3, which should
%% rather delegate to a behavior callback than implementing domain knowledge
%% here (reversed dependency)
-define(INSERT_TABLET_PATH, "/rest/v2/insertTablet").
-if(?EMQX_RELEASE_EDITION == ee).
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
bridge_to_resource_type(mqtt) -> emqx_connector_mqtt;
@ -316,7 +311,6 @@ parse_confs(
url := Url,
method := Method,
headers := Headers,
request_timeout := ReqTimeout,
max_retries := Retry
} = Conf
) ->
@ -330,6 +324,10 @@ parse_confs(
Reason1 = emqx_utils:readable_error_msg(Reason),
invalid_data(<<"Invalid URL: ", Url1/binary, ", details: ", Reason1/binary>>)
end,
RequestTimeout = emqx_utils_maps:deep_get(
[resource_opts, request_timeout],
Conf
),
Conf#{
base_url => BaseUrl1,
request =>
@ -338,11 +336,16 @@ parse_confs(
method => Method,
body => maps:get(body, Conf, undefined),
headers => Headers,
request_timeout => ReqTimeout,
request_timeout => RequestTimeout,
max_retries => Retry
}
};
parse_confs(<<"iotdb">>, Name, Conf) ->
%% [FIXME] this has no place here, it's used in parse_confs/3, which should
%% rather delegate to a behavior callback than implementing domain knowledge
%% here (reversed dependency)
InsertTabletPathV1 = <<"rest/v1/insertTablet">>,
InsertTabletPathV2 = <<"rest/v2/insertTablet">>,
#{
base_url := BaseURL,
authentication :=
@ -352,10 +355,21 @@ parse_confs(<<"iotdb">>, Name, Conf) ->
}
} = Conf,
BasicToken = base64:encode(<<Username/binary, ":", Password/binary>>),
%% This version atom correspond to the macro ?VSN_1_1_X in
%% emqx_bridge_iotdb.hrl. It would be better to use the macro directly, but
%% this cannot be done without introducing a dependency on the
%% emqx_iotdb_bridge app (which is an EE app).
DefaultIOTDBBridge = 'v1.1.x',
Version = maps:get(iotdb_version, Conf, DefaultIOTDBBridge),
InsertTabletPath =
case Version of
DefaultIOTDBBridge -> InsertTabletPathV2;
_ -> InsertTabletPathV1
end,
WebhookConfig =
Conf#{
method => <<"post">>,
url => <<BaseURL/binary, ?INSERT_TABLET_PATH>>,
url => <<BaseURL/binary, InsertTabletPath/binary>>,
headers => [
{<<"Content-type">>, <<"application/json">>},
{<<"Authorization">>, BasicToken}

View File

@ -251,25 +251,6 @@ node_name() ->
{"node", mk(binary(), #{desc => ?DESC("desc_node_name"), example => "emqx@127.0.0.1"})}.
webhook_bridge_converter(Conf0, _HoconOpts) ->
Conf1 = emqx_bridge_compatible_config:upgrade_pre_ee(
emqx_bridge_compatible_config:upgrade_pre_ee(
Conf0, fun emqx_bridge_compatible_config:webhook_maybe_upgrade/1
),
case Conf1 of
undefined ->
undefined;
_ ->
maps:map(
fun(_Name, Conf) ->
do_convert_webhook_config(Conf)
end,
Conf1
)
end.
%% We hide resource_opts.request_timeout from user.
do_convert_webhook_config(
#{<<"request_timeout">> := ReqT, <<"resource_opts">> := ResOpts} = Conf
) ->
Conf#{<<"resource_opts">> => ResOpts#{<<"request_timeout">> => ReqT}};
do_convert_webhook_config(Conf) ->
Conf.
).

View File

@ -40,15 +40,12 @@ fields("put") ->
fields("get") ->
emqx_bridge_schema:status_fields() ++ fields("post");
fields("creation_opts") ->
[
hidden_request_timeout()
| lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts")
)
].
lists:filter(
fun({K, _V}) ->
not lists:member(K, unsupported_opts())
end,
emqx_resource_schema:fields("creation_opts")
).
desc("config") ->
?DESC("desc_config");
@ -144,6 +141,7 @@ request_config() ->
emqx_schema:duration_ms(),
#{
default => <<"15s">>,
deprecated => {since, "v5.0.26"},
desc => ?DESC("config_request_timeout")
}
)}
@ -166,8 +164,7 @@ unsupported_opts() ->
[
enable_batch,
batch_size,
batch_time,
request_timeout
batch_time
].
%%======================================================================================
@ -194,13 +191,3 @@ name_field() ->
method() ->
enum([post, put, get, delete]).
hidden_request_timeout() ->
{request_timeout,
mk(
hoconsc:union([infinity, emqx_schema:duration_ms()]),
#{
required => false,
importance => ?IMPORTANCE_HIDDEN
}
)}.

View File

@ -79,7 +79,8 @@ groups() ->
SingleOnlyTests = [
t_broken_bpapi_vsn,
t_old_bpapi_vsn,
t_bridges_probe
t_bridges_probe,
t_auto_restart_interval
],
ClusterLaterJoinOnlyTCs = [t_cluster_later_join_metrics],
[
@ -551,6 +552,89 @@ t_http_crud_apis(Config) ->
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config).
t_auto_restart_interval(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:new(emqx_resource, [passthrough]),
meck:expect(emqx_resource, call_start, fun(_, _, _) -> {error, fake_error} end),
%% then we add a webhook bridge, using POST
%% POST /bridges/ will create a bridge
URL1 = ?URL(Port, "path1"),
Name = ?BRIDGE_NAME,
BridgeID = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_HTTP, Name),
BridgeParams = ?HTTP_BRIDGE(URL1, Name)#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "1s"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
{ok, _} = ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500)
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([#{}], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
%% auto_retry_interval=infinity
BridgeParams1 = BridgeParams#{
<<"resource_opts">> => #{<<"auto_restart_interval">> => "infinity"}
},
?check_trace(
begin
?assertMatch(
{ok, 201, #{
<<"type">> := ?BRIDGE_TYPE_HTTP,
<<"name">> := Name,
<<"enable">> := true,
<<"status">> := _,
<<"node_status">> := [_ | _],
<<"url">> := URL1
}},
request_json(
post,
uri(["bridges"]),
BridgeParams1,
Config
)
),
{ok, _} = ?block_until(#{?snk_kind := resource_disconnected_enter}),
?assertEqual(timeout, ?block_until(#{?snk_kind := resource_auto_reconnect}, 1500))
end,
fun(Trace0) ->
Trace = ?of_kind(resource_auto_reconnect, Trace0),
?assertMatch([], Trace),
ok
end
),
%% delete the bridge
{ok, 204, <<>>} = request(delete, uri(["bridges", BridgeID]), Config),
{ok, 200, []} = request_json(get, uri(["bridges"]), Config),
meck:unload(emqx_resource).
t_http_bridges_local_topic(Config) ->
Port = ?config(port, Config),
%% assert we there's no bridges at first
@ -1307,18 +1391,20 @@ t_inconsistent_webhook_request_timeouts(Config) ->
<<"resource_opts">> => #{<<"request_timeout">> => <<"2s">>}
}
),
{ok, 201, #{
<<"request_timeout">> := <<"1s">>,
<<"resource_opts">> := ResourceOpts
}} =
%% root request_timeout is deprecated for bridge.
{ok, 201,
#{
<<"resource_opts">> := ResourceOpts
} = Response} =
request_json(
post,
uri(["bridges"]),
BadBridgeParams,
Config
),
?assertNot(maps:is_key(<<"request_timeout">>, ResourceOpts)),
validate_resource_request_timeout(proplists:get_value(group, Config), 1000, Name),
?assertNot(maps:is_key(<<"request_timeout">>, Response)),
?assertMatch(#{<<"request_timeout">> := <<"2s">>}, ResourceOpts),
validate_resource_request_timeout(proplists:get_value(group, Config), 2000, Name),
ok.
t_cluster_later_join_metrics(Config) ->

View File

@ -65,15 +65,13 @@ webhook_config_test() ->
<<"the_name">> :=
#{
<<"method">> := get,
<<"request_timeout">> := RequestTime,
<<"resource_opts">> := ResourceOpts,
<<"body">> := <<"${payload}">>
}
}
}
} = check(Conf3),
?assertEqual(60_000, RequestTime),
?assertMatch(#{<<"request_timeout">> := 60_000}, ResourceOpts),
?assertMatch(#{<<"request_timeout">> := infinity}, ResourceOpts),
ok.
up(#{<<"bridges">> := Bridges0} = Conf0) ->
@ -196,7 +194,7 @@ full_webhook_v5019_hocon() ->
" pool_type = \"random\"\n"
" request_timeout = \"1m\"\n"
" resource_opts = {\n"
" request_timeout = \"7s\"\n"
" request_timeout = \"infinity\"\n"
" }\n"
" ssl {\n"
" ciphers = \"\"\n"

View File

@ -5,7 +5,8 @@
-ifndef(EMQX_BRIDGE_IOTDB_HRL).
-define(EMQX_BRIDGE_IOTDB_HRL, true).
-define(VSN_1_X, 'v1.x').
-define(VSN_1_1_X, 'v1.1.x').
-define(VSN_1_0_X, 'v1.0.x').
-define(VSN_0_13_X, 'v0.13.x').
-endif.

View File

@ -109,10 +109,10 @@ basic_config() ->
)},
{iotdb_version,
mk(
hoconsc:enum([?VSN_1_X, ?VSN_0_13_X]),
hoconsc:enum([?VSN_1_1_X, ?VSN_1_0_X, ?VSN_0_13_X]),
#{
desc => ?DESC("config_iotdb_version"),
default => ?VSN_1_X
default => ?VSN_1_1_X
}
)}
] ++ resource_creation_opts() ++
@ -130,6 +130,7 @@ request_config() ->
mk(
emqx_schema:url(),
#{
required => true,
desc => ?DESC("config_base_url")
}
)},
@ -217,7 +218,7 @@ conn_bridge_example(_Method, Type) ->
is_aligned => false,
device_id => <<"my_device">>,
base_url => <<"http://iotdb.local:18080/">>,
iotdb_version => ?VSN_1_X,
iotdb_version => ?VSN_1_1_X,
connect_timeout => <<"15s">>,
pool_type => <<"random">>,
pool_size => 8,

View File

@ -280,7 +280,7 @@ make_iotdb_insert_request(MessageUnparsedPayload, State) ->
Message = maps:update_with(payload, fun make_parsed_payload/1, MessageUnparsedPayload),
IsAligned = maps:get(is_aligned, State, false),
DeviceId = device_id(Message, State),
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_X),
IotDBVsn = maps:get(iotdb_version, State, ?VSN_1_1_X),
Payload = make_list(maps:get(payload, Message)),
PreProcessedData = preproc_data_list(Payload),
DataList = proc_data(PreProcessedData, Message),
@ -349,15 +349,21 @@ insert_value(1, Data, [Value | Values]) ->
insert_value(Index, Data, [Value | Values]) ->
[[null | Value] | insert_value(Index - 1, Data, Values)].
iotdb_field_key(is_aligned, ?VSN_1_X) ->
iotdb_field_key(is_aligned, ?VSN_1_1_X) ->
<<"is_aligned">>;
iotdb_field_key(is_aligned, ?VSN_1_0_X) ->
<<"is_aligned">>;
iotdb_field_key(is_aligned, ?VSN_0_13_X) ->
<<"isAligned">>;
iotdb_field_key(device_id, ?VSN_1_X) ->
iotdb_field_key(device_id, ?VSN_1_1_X) ->
<<"device">>;
iotdb_field_key(device_id, ?VSN_1_0_X) ->
<<"device">>;
iotdb_field_key(device_id, ?VSN_0_13_X) ->
<<"deviceId">>;
iotdb_field_key(data_types, ?VSN_1_X) ->
iotdb_field_key(data_types, ?VSN_1_1_X) ->
<<"data_types">>;
iotdb_field_key(data_types, ?VSN_1_0_X) ->
<<"data_types">>;
iotdb_field_key(data_types, ?VSN_0_13_X) ->
<<"dataTypes">>.

View File

@ -994,36 +994,33 @@ reconstruct_assignments_from_events(KafkaTopic, Events0, Acc0) ->
Assignments
).
setup_group_subscriber_spy(Node) ->
setup_group_subscriber_spy_fn() ->
TestPid = self(),
ok = erpc:call(
Node,
fun() ->
ok = meck:new(brod_group_subscriber_v2, [
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok
end
).
fun() ->
ok = meck:new(brod_group_subscriber_v2, [
passthrough, no_link, no_history, non_strict
]),
ok = meck:expect(
brod_group_subscriber_v2,
assignments_received,
fun(Pid, MemberId, GenerationId, TopicAssignments) ->
?tp(
kafka_assignment,
#{
node => node(),
pid => Pid,
member_id => MemberId,
generation_id => GenerationId,
topic_assignments => TopicAssignments
}
),
TestPid !
{kafka_assignment, node(), {Pid, MemberId, GenerationId, TopicAssignments}},
meck:passthrough([Pid, MemberId, GenerationId, TopicAssignments])
end
),
ok
end.
wait_for_cluster_rpc(Node) ->
%% need to wait until the config handler is ready after
@ -1067,6 +1064,7 @@ cluster(Config) ->
_ ->
ct_slave
end,
ExtraEnvHandlerHook = setup_group_subscriber_spy_fn(),
Cluster = emqx_common_test_helpers:emqx_cluster(
[core, core],
[
@ -1080,6 +1078,7 @@ cluster(Config) ->
{env_handler, fun
(emqx) ->
application:set_env(emqx, boot_modules, [broker, router]),
ExtraEnvHandlerHook(),
ok;
(emqx_conf) ->
ok;
@ -1680,7 +1679,6 @@ t_cluster_group(Config) ->
Nodes
)
end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes),
@ -1757,7 +1755,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N1]),
ok = emqx_common_test_helpers:stop_slave(N1)
end),
setup_group_subscriber_spy(N1),
{{ok, _}, {ok, _}} =
?wait_async_action(
erpc:call(N1, fun() ->
@ -1801,7 +1798,6 @@ t_node_joins_existing_cluster(Config) ->
ct:pal("stopping ~p", [N2]),
ok = emqx_common_test_helpers:stop_slave(N2)
end),
setup_group_subscriber_spy(N2),
Nodes = [N1, N2],
wait_for_cluster_rpc(N2),
@ -1902,7 +1898,6 @@ t_cluster_node_down(Config) ->
Nodes
)
end),
lists:foreach(fun setup_group_subscriber_spy/1, Nodes),
{ok, SRef0} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := kafka_consumer_subscriber_started}),
length(Nodes),

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_oracle, [
{description, "EMQX Enterprise Oracle Database Bridge"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -20,7 +20,7 @@
]).
-define(DEFAULT_SQL, <<
"insert into t_mqtt_msg(msgid, topic, qos, payload)"
"insert into t_mqtt_msgs(msgid, topic, qos, payload) "
"values (${id}, ${topic}, ${qos}, ${payload})"
>>).
@ -41,7 +41,7 @@ values(_Method) ->
name => <<"foo">>,
server => <<"127.0.0.1:1521">>,
pool_size => 8,
database => <<"ORCL">>,
service_name => <<"ORCL">>,
sid => <<"ORCL">>,
username => <<"root">>,
password => <<"******">>,

View File

@ -14,7 +14,7 @@
-define(BRIDGE_TYPE_BIN, <<"oracle">>).
-define(APPS, [emqx_bridge, emqx_resource, emqx_rule_engine, emqx_oracle, emqx_bridge_oracle]).
-define(DATABASE, "XE").
-define(SID, "XE").
-define(RULE_TOPIC, "mqtt/rule").
% -define(RULE_TOPIC_BIN, <<?RULE_TOPIC>>).
@ -196,7 +196,6 @@ oracle_config(TestCase, _ConnectionType, Config) ->
io_lib:format(
"bridges.oracle.~s {\n"
" enable = true\n"
" database = \"~s\"\n"
" sid = \"~s\"\n"
" server = \"~s\"\n"
" username = \"system\"\n"
@ -215,8 +214,7 @@ oracle_config(TestCase, _ConnectionType, Config) ->
"}\n",
[
Name,
?DATABASE,
?DATABASE,
?SID,
ServerURL,
sql_insert_template_for_bridge()
]

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_pulsar, [
{description, "EMQX Pulsar Bridge"},
{vsn, "0.1.2"},
{vsn, "0.1.3"},
{registered, []},
{applications, [
kernel,

View File

@ -1,6 +1,6 @@
{application, emqx_conf, [
{description, "EMQX configuration management"},
{vsn, "0.1.20"},
{vsn, "0.1.21"},
{registered, []},
{mod, {emqx_conf_app, []}},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.23"},
{vsn, "0.1.24"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -53,6 +53,7 @@
]).
-define(DEFAULT_PIPELINE_SIZE, 100).
-define(DEFAULT_REQUEST_TIMEOUT_MS, 30_000).
%%=====================================================================
%% Hocon schema
@ -467,7 +468,7 @@ preprocess_request(
path => emqx_plugin_libs_rule:preproc_tmpl(Path),
body => maybe_preproc_tmpl(body, Req),
headers => wrap_auth_header(preproc_headers(Headers)),
request_timeout => maps:get(request_timeout, Req, 30000),
request_timeout => maps:get(request_timeout, Req, ?DEFAULT_REQUEST_TIMEOUT_MS),
max_retries => maps:get(max_retries, Req, 2)
}.

View File

@ -1,6 +1,6 @@
{application, emqx_ctl, [
{description, "Backend for emqx_ctl script"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{mod, {emqx_ctl_app, []}},
{applications, [

View File

@ -228,7 +228,7 @@ handle_call({register_command, Cmd, MF, Opts}, _From, State = #state{seq = Seq})
ets:insert(?CMD_TAB, {{Seq, Cmd}, MF, Opts}),
{reply, ok, next_seq(State)};
[[OriginSeq] | _] ->
?LOG_WARNING(#{msg => "CMD_overidden", cmd => Cmd, mf => MF}),
?LOG_WARNING(#{msg => "CMD_overridden", cmd => Cmd, mf => MF}),
true = ets:insert(?CMD_TAB, {{OriginSeq, Cmd}, MF, Opts}),
{reply, ok, State}
end;

View File

@ -2,7 +2,7 @@
{application, emqx_dashboard, [
{description, "EMQX Web Dashboard"},
% strict semver, bump manually!
{vsn, "5.0.21"},
{vsn, "5.0.22"},
{modules, []},
{registered, [emqx_dashboard_sup]},
{applications, [kernel, stdlib, mnesia, minirest, emqx, emqx_ctl]},

View File

@ -21,11 +21,12 @@
-export([init/2]).
init(Req0, State) ->
?SLOG(warning, #{msg => "unexpected_api_access", request => Req0}),
RedactedReq = emqx_utils:redact(Req0),
?SLOG(warning, #{msg => "unexpected_api_access", request => RedactedReq}),
Req = cowboy_req:reply(
404,
#{<<"content-type">> => <<"application/json">>},
<<"{\"code\": \"API_NOT_EXIST\", \"message\": \"Request Path Not Found\"}">>,
Req0
RedactedReq
),
{ok, Req, State}.

View File

@ -1,6 +1,6 @@
{application, emqx_ft, [
{description, "EMQX file transfer over MQTT"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{mod, {emqx_ft_app, []}},
{applications, [

View File

@ -58,7 +58,7 @@ prop_coverage_likely_incomplete() ->
{filesize_t(), segsizes_t(), filesize_t()},
?FORALL(
Fragments,
noshrink(segments_t(Filesize, Segsizes, Hole)),
noshrink(segments_t(Filesize, Segsizes, (Hole rem max(Filesize, 1)))),
?TIMEOUT(
?COVERAGE_TIMEOUT,
begin
@ -174,7 +174,7 @@ segment_t(Filesize, Segsizes, Hole) ->
?SUCHTHATMAYBE(
{Offset, Size},
segment_t(Filesize, Segsizes),
(Hole rem Filesize) =< Offset orelse (Hole rem Filesize) > (Offset + Size)
Hole =< Offset orelse Hole > (Offset + Size)
).
segment_t(Filesize, Segsizes) ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_gateway, [
{description, "The Gateway management application"},
{vsn, "0.1.17"},
{vsn, "0.1.18"},
{registered, []},
{mod, {emqx_gateway_app, []}},
{applications, [kernel, stdlib, emqx, emqx_authn, emqx_ctl]},

View File

@ -133,8 +133,10 @@ clients(get, #{
case Result of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{error, Node, Error} ->
Message = list_to_binary(
io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])
),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Response ->
{200, Response}

View File

@ -3,7 +3,7 @@
{id, "emqx_machine"},
{description, "The EMQX Machine"},
% strict semver, bump manually!
{vsn, "0.2.4"},
{vsn, "0.2.5"},
{modules, []},
{registered, []},
{applications, [kernel, stdlib, emqx_ctl]},

View File

@ -24,6 +24,7 @@
-export([stop_port_apps/0]).
-dialyzer({no_match, [basic_reboot_apps/0]}).
-dialyzer({no_match, [basic_reboot_apps_edition/1]}).
-ifdef(TEST).
-export([sorted_reboot_apps/1, reboot_apps/0]).
@ -126,39 +127,40 @@ reboot_apps() ->
BaseRebootApps ++ ConfigApps.
basic_reboot_apps() ->
CE =
?BASIC_REBOOT_APPS ++
[
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_slow_subs,
emqx_auto_subscribe,
emqx_plugins
],
case emqx_release:edition() of
ce ->
CE ++ [emqx_telemetry];
ee ->
CE ++
[
emqx_s3,
emqx_ft,
emqx_eviction_agent,
emqx_node_rebalance
]
end.
?BASIC_REBOOT_APPS ++
[
emqx_prometheus,
emqx_modules,
emqx_dashboard,
emqx_connector,
emqx_gateway,
emqx_resource,
emqx_rule_engine,
emqx_bridge,
emqx_plugin_libs,
emqx_management,
emqx_retainer,
emqx_exhook,
emqx_authn,
emqx_authz,
emqx_slow_subs,
emqx_auto_subscribe,
emqx_plugins
] ++ basic_reboot_apps_edition(emqx_release:edition()).
basic_reboot_apps_edition(ce) ->
[emqx_telemetry];
basic_reboot_apps_edition(ee) ->
[
emqx_license,
emqx_s3,
emqx_ft,
emqx_eviction_agent,
emqx_node_rebalance
];
%% unexcepted edition, should not happen
basic_reboot_apps_edition(_) ->
[].
sorted_reboot_apps() ->
Apps = [{App, app_deps(App)} || App <- reboot_apps()],

View File

@ -2,7 +2,7 @@
{application, emqx_management, [
{description, "EMQX Management API and CLI"},
% strict semver, bump manually!
{vsn, "5.0.22"},
{vsn, "5.0.23"},
{modules, []},
{registered, [emqx_management_sup]},
{applications, [kernel, stdlib, emqx_plugins, minirest, emqx, emqx_ctl]},

View File

@ -134,8 +134,8 @@ do_node_query(
ResultAcc
) ->
case do_query(Node, QueryState) of
{error, {badrpc, R}} ->
{error, Node, {badrpc, R}};
{error, Error} ->
{error, Node, Error};
{Rows, NQueryState = #{complete := Complete}} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
@ -179,8 +179,8 @@ do_cluster_query(
ResultAcc
) ->
case do_query(Node, QueryState) of
{error, {badrpc, R}} ->
{error, Node, {badrpc, R}};
{error, Error} ->
{error, Node, Error};
{Rows, NQueryState = #{complete := Complete}} ->
case accumulate_query_rows(Node, Rows, NQueryState, ResultAcc) of
{enough, NResultAcc} ->
@ -275,7 +275,7 @@ do_query(Node, QueryState) when Node =:= node() ->
do_select(Node, QueryState);
do_query(Node, QueryState) ->
case
rpc:call(
catch rpc:call(
Node,
?MODULE,
do_query,
@ -284,6 +284,7 @@ do_query(Node, QueryState) ->
)
of
{badrpc, _} = R -> {error, R};
{'EXIT', _} = R -> {error, R};
Ret -> Ret
end.
@ -298,15 +299,24 @@ do_select(
) ->
QueryState = maybe_apply_total_query(Node, QueryState0),
Result =
case maps:get(continuation, QueryState, undefined) of
undefined ->
ets:select(Tab, Ms, Limit);
Continuation ->
%% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query`
ets:select(ets:repair_continuation(Continuation, Ms))
try
case maps:get(continuation, QueryState, undefined) of
undefined ->
ets:select(Tab, Ms, Limit);
Continuation ->
%% XXX: Repair is necessary because we pass Continuation back
%% and forth through the nodes in the `do_cluster_query`
ets:select(ets:repair_continuation(Continuation, Ms))
end
catch
exit:_ = Exit ->
{error, Exit};
Type:Reason:Stack ->
{error, #{exception => Type, reason => Reason, stacktrace => Stack}}
end,
case Result of
{error, _} ->
{[], mark_complete(QueryState)};
{Rows, '$end_of_table'} ->
NRows = maybe_apply_fuzzy_filter(Rows, QueryState),
{NRows, mark_complete(QueryState)};
@ -354,7 +364,11 @@ counting_total_fun(_QueryState = #{match_spec := Ms, fuzzy_fun := undefined}) ->
[{MatchHead, Conditions, _Return}] = Ms,
CountingMs = [{MatchHead, Conditions, [true]}],
fun(Tab) ->
ets:select_count(Tab, CountingMs)
try
ets:select_count(Tab, CountingMs)
catch
_Type:_Reason -> 0
end
end;
counting_total_fun(_QueryState = #{fuzzy_fun := FuzzyFun}) when FuzzyFun =/= undefined ->
%% XXX: Calculating the total number for a fuzzy searching is very very expensive

View File

@ -123,8 +123,8 @@ alarms(get, #{query_string := QString}) ->
of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{error, Node, Error} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Response ->
{200, Response}

View File

@ -120,8 +120,8 @@ do_list(Params) ->
of
{error, page_limit_invalid} ->
{400, #{code => <<"INVALID_PARAMETER">>, message => <<"page_limit_invalid">>}};
{error, Node, {badrpc, R}} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, R])),
{error, Node, Error} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Response ->
{200, Response}

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang; -*-
{erl_opts, [debug_info]}.
{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.4"}}}
{deps, [ {jamdb_oracle, {git, "https://github.com/emqx/jamdb_oracle", {tag, "0.4.9.5"}}}
, {emqx_connector, {path, "../../apps/emqx_connector"}}
, {emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -1,6 +1,6 @@
{application, emqx_oracle, [
{description, "EMQX Enterprise Oracle Database Connector"},
{vsn, "0.1.0"},
{vsn, "0.1.1"},
{registered, []},
{applications, [
kernel,

View File

@ -75,8 +75,6 @@ on_start(
InstId,
#{
server := Server,
database := DB,
sid := Sid,
username := User
} = Config
) ->
@ -91,15 +89,19 @@ on_start(
jamdb_oracle_conn:set_max_cursors_number(?MAX_CURSORS),
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, oracle_host_options()),
ServiceName = maps:get(<<"service_name">>, Config, Sid),
Sid = maps:get(sid, Config, ""),
ServiceName =
case maps:get(service_name, Config, undefined) of
undefined -> undefined;
ServiceName0 -> emqx_plugin_libs_rule:str(ServiceName0)
end,
Options = [
{host, Host},
{port, Port},
{user, emqx_plugin_libs_rule:str(User)},
{password, emqx_secret:wrap(maps:get(password, Config, ""))},
{password, jamdb_secret:wrap(maps:get(password, Config, ""))},
{sid, emqx_plugin_libs_rule:str(Sid)},
{service_name, emqx_plugin_libs_rule:str(ServiceName)},
{database, DB},
{service_name, ServiceName},
{pool_size, maps:get(<<"pool_size">>, Config, ?DEFAULT_POOL_SIZE)},
{timeout, ?OPT_TIMEOUT},
{app_name, "EMQX Data To Oracle Database Action"}
@ -258,9 +260,7 @@ oracle_host_options() ->
?ORACLE_HOST_OPTIONS.
connect(Opts) ->
Password = emqx_secret:unwrap(proplists:get_value(password, Opts)),
NewOpts = lists:keyreplace(password, 1, Opts, {password, Password}),
jamdb_oracle:start_link(NewOpts).
jamdb_oracle:start_link(Opts).
sql_query_to_str(SqlQuery) ->
emqx_plugin_libs_rule:str(SqlQuery).

View File

@ -19,9 +19,11 @@ roots() ->
[{config, #{type => hoconsc:ref(?REF_MODULE, config)}}].
fields(config) ->
[{server, server()}, {sid, fun sid/1}] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:prepare_statement_fields().
Fields =
[{server, server()}, {sid, fun sid/1}, {service_name, fun service_name/1}] ++
emqx_connector_schema_lib:relational_db_fields() ++
emqx_connector_schema_lib:prepare_statement_fields(),
proplists:delete(database, Fields).
server() ->
Meta = #{desc => ?DESC(?REF_MODULE, "server")},
@ -29,5 +31,10 @@ server() ->
sid(type) -> binary();
sid(desc) -> ?DESC(?REF_MODULE, "sid");
sid(required) -> true;
sid(required) -> false;
sid(_) -> undefined.
service_name(type) -> binary();
service_name(desc) -> ?DESC(?REF_MODULE, "service_name");
service_name(required) -> false;
service_name(_) -> undefined.

View File

@ -66,7 +66,7 @@
start_after_created => boolean(),
%% If the resource disconnected, we can set to retry starting the resource
%% periodically.
auto_restart_interval => pos_integer(),
auto_restart_interval => pos_integer() | infinity,
batch_size => pos_integer(),
batch_time => pos_integer(),
max_buffer_bytes => pos_integer(),

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_resource, [
{description, "Manager for all external resources"},
{vsn, "0.1.16"},
{vsn, "0.1.17"},
{registered, []},
{mod, {emqx_resource_app, []}},
{applications, [

View File

@ -389,8 +389,10 @@ handle_event(state_timeout, health_check, connected, Data) ->
%% State: DISCONNECTED
handle_event(enter, _OldState, disconnected = State, Data) ->
ok = log_state_consistency(State, Data),
?tp(resource_disconnected_enter, #{}),
{keep_state_and_data, retry_actions(Data)};
handle_event(state_timeout, auto_retry, disconnected, Data) ->
?tp(resource_auto_reconnect, #{}),
start_resource(Data, undefined);
%% State: STOPPED
%% The stopped state is entered after the resource has been explicitly stopped
@ -450,6 +452,8 @@ retry_actions(Data) ->
case maps:get(auto_restart_interval, Data#data.opts, ?AUTO_RESTART_INTERVAL) of
undefined ->
[];
infinity ->
[];
RetryInterval ->
[{state_timeout, RetryInterval, auto_retry}]
end.

View File

@ -102,12 +102,14 @@ health_check_interval_range(HealthCheckInterval) when
HealthCheckInterval =< ?HEALTH_CHECK_INTERVAL_RANGE_MAX
->
ok;
health_check_interval_range(_HealthCheckInterval) ->
{error, #{
msg => <<"Health Check Interval out of range">>,
min => ?HEALTH_CHECK_INTERVAL_RANGE_MIN,
max => ?HEALTH_CHECK_INTERVAL_RANGE_MAX
}}.
health_check_interval_range(HealthCheckInterval) ->
Message = get_out_of_range_msg(
<<"Health Check Interval">>,
HealthCheckInterval,
?HEALTH_CHECK_INTERVAL_RANGE_MIN,
?HEALTH_CHECK_INTERVAL_RANGE_MAX
),
{error, Message}.
start_after_created(type) -> boolean();
start_after_created(desc) -> ?DESC("start_after_created");
@ -128,18 +130,22 @@ auto_restart_interval(required) -> false;
auto_restart_interval(validator) -> fun auto_restart_interval_range/1;
auto_restart_interval(_) -> undefined.
auto_restart_interval_range(infinity) ->
ok;
auto_restart_interval_range(AutoRestartInterval) when
is_integer(AutoRestartInterval) andalso
AutoRestartInterval >= ?AUTO_RESTART_INTERVAL_RANGE_MIN andalso
AutoRestartInterval =< ?AUTO_RESTART_INTERVAL_RANGE_MAX
->
ok;
auto_restart_interval_range(_AutoRestartInterval) ->
{error, #{
msg => <<"Auto Restart Interval out of range">>,
min => ?AUTO_RESTART_INTERVAL_RANGE_MIN,
max => ?AUTO_RESTART_INTERVAL_RANGE_MAX
}}.
auto_restart_interval_range(AutoRestartInterval) ->
Message = get_out_of_range_msg(
<<"Auto Restart Interval">>,
AutoRestartInterval,
?AUTO_RESTART_INTERVAL_RANGE_MIN,
?AUTO_RESTART_INTERVAL_RANGE_MAX
),
{error, Message}.
query_mode(type) -> enum([sync, async]);
query_mode(desc) -> ?DESC("query_mode");
@ -206,3 +212,15 @@ buffer_seg_bytes(importance) -> ?IMPORTANCE_HIDDEN;
buffer_seg_bytes(_) -> undefined.
desc("creation_opts") -> ?DESC("creation_opts").
get_value_with_unit(Value) when is_integer(Value) ->
<<(erlang:integer_to_binary(Value))/binary, "ms">>;
get_value_with_unit(Value) ->
Value.
get_out_of_range_msg(Field, Value, Min, Max) ->
ValueStr = get_value_with_unit(Value),
MinStr = get_value_with_unit(Min),
MaxStr = get_value_with_unit(Max),
<<Field/binary, " (", ValueStr/binary, ") is out of range (", MinStr/binary, " to ",
MaxStr/binary, ")">>.

View File

@ -2,7 +2,7 @@
{application, emqx_rule_engine, [
{description, "EMQX Rule Engine"},
% strict semver, bump manually!
{vsn, "5.0.17"},
{vsn, "5.0.18"},
{modules, []},
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},

View File

@ -339,6 +339,9 @@ param_path_id() ->
of
{error, page_limit_invalid} ->
{400, #{code => 'BAD_REQUEST', message => <<"page_limit_invalid">>}};
{error, Node, Error} ->
Message = list_to_binary(io_lib:format("bad rpc call ~p, Reason ~p", [Node, Error])),
{500, #{code => <<"NODE_DOWN">>, message => Message}};
Result ->
{200, Result}
end;

View File

@ -1,6 +1,6 @@
{application, emqx_s3, [
{description, "EMQX S3"},
{vsn, "5.0.6"},
{vsn, "5.0.7"},
{modules, []},
{registered, [emqx_s3_sup]},
{applications, [

View File

@ -121,7 +121,7 @@ start_reporting() ->
%% @doc Stop the reporting timer.
%% This is an async notification which never fails.
%% This is a no-op in enterprise eidtion.
%% This is a no-op in enterprise edition.
stop_reporting() ->
gen_server:cast(?MODULE, stop_reporting).

View File

@ -32,11 +32,12 @@
-export([
on_server_start/0,
on_server_stop/0,
is_official_version/1
is_official_version/1,
is_official_version/0
]).
is_enabled() ->
IsOfficial = ?MODULE:is_official_version(emqx_release:version()),
IsOfficial = ?MODULE:is_official_version(),
emqx_conf:get([telemetry, enable], IsOfficial).
on_server_start() ->
@ -53,7 +54,9 @@ set_telemetry_status(Status) ->
end.
pre_config_update(_, {set_telemetry_status, Status}, RawConf) ->
{ok, RawConf#{<<"enable">> => Status}}.
{ok, RawConf#{<<"enable">> => Status}};
pre_config_update(_, NewConf, _OldConf) ->
{ok, NewConf}.
post_config_update(
_,
@ -65,6 +68,11 @@ post_config_update(
case Status of
true -> emqx_telemetry:start_reporting();
false -> emqx_telemetry:stop_reporting()
end;
post_config_update(_, _UpdateReq, NewConf, _OldConf, _AppEnvs) ->
case maps:get(enable, NewConf, ?MODULE:is_official_version()) of
true -> emqx_telemetry:start_reporting();
false -> emqx_telemetry:stop_reporting()
end.
cfg_update(Path, Action, Params) ->
@ -74,6 +82,9 @@ cfg_update(Path, Action, Params) ->
#{override_to => cluster}
).
is_official_version() ->
is_official_version(emqx_release:version()).
is_official_version(Version) ->
Pt = "^\\d+\\.\\d+(?:\\.\\d+)?(?:(-(?:alpha|beta|rc)\\.[1-9][0-9]*))?$",
match =:= re:run(Version, Pt, [{capture, none}]).

View File

@ -30,6 +30,7 @@ all() ->
init_per_suite(Config) ->
ok = emqx_common_test_helpers:load_config(emqx_modules_schema, ?BASE_CONF),
ok = emqx_common_test_helpers:load_config(emqx_telemetry_schema, ?BASE_CONF),
ok = emqx_mgmt_api_test_util:init_suite(
[emqx_conf, emqx_authn, emqx_authz, emqx_telemetry],
fun set_special_configs/1
@ -52,32 +53,26 @@ end_per_suite(_Config) ->
ok.
init_per_testcase(t_status_non_official, Config) ->
meck:new(emqx_telemetry, [non_strict, passthrough]),
meck:expect(emqx_telemetry, official_version, 1, false),
meck:new(emqx_telemetry_config, [non_strict, passthrough]),
meck:expect(emqx_telemetry_config, is_official_version, 0, false),
%% check non-official telemetry is disable by default
{ok, _} = emqx:update_config([telemetry], #{}),
Config;
init_per_testcase(t_status, Config) ->
meck:new(emqx_telemetry, [non_strict, passthrough]),
meck:expect(emqx_telemetry, enable, fun() -> ok end),
{ok, _, _} =
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
),
init_per_testcase(t_status_official, Config) ->
meck:new(emqx_telemetry_config, [non_strict, passthrough]),
meck:expect(emqx_telemetry_config, is_official_version, 0, true),
%% check official telemetry is enable by default
{ok, _} = emqx:update_config([telemetry], #{}),
Config;
init_per_testcase(_TestCase, Config) ->
{ok, _, _} =
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
),
%% Force enable telemetry to check data.
{ok, _} = emqx:update_config([telemetry], #{<<"enable">> => true}),
Config.
end_per_testcase(t_status_non_official, _Config) ->
meck:unload(emqx_telemetry);
meck:unload(emqx_telemetry_config);
end_per_testcase(t_status, _Config) ->
meck:unload(emqx_telemetry);
meck:unload(emqx_telemetry_config);
end_per_testcase(_TestCase, _Config) ->
ok.
@ -95,39 +90,50 @@ set_special_configs(_App) ->
%% Tests
%%------------------------------------------------------------------------------
t_status(_) ->
%% official's telemetry is enabled by default
t_status_official(_) ->
check_status(true).
%% non official's telemetry is disabled by default
t_status_non_official(_) ->
check_status(false).
check_status(Default) ->
ct:pal("Check telemetry status:~p~n", [emqx_telemetry_config:is_official_version()]),
?assertEqual(Default, is_telemetry_process_enabled()),
?assertMatch(
{ok, 200, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
#{<<"enable">> => (not Default)}
)
),
{ok, 200, Result0} =
request(get, uri(["telemetry", "status"])),
?assertEqual(
#{<<"enable">> => false},
#{<<"enable">> => (not Default)},
emqx_utils_json:decode(Result0)
),
?assertEqual((not Default), is_telemetry_process_enabled()),
?assertMatch(
{ok, 400, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
#{<<"enable">> => (not Default)}
)
),
?assertEqual((not Default), is_telemetry_process_enabled()),
?assertMatch(
{ok, 200, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
#{<<"enable">> => Default}
)
),
@ -135,30 +141,24 @@ t_status(_) ->
request(get, uri(["telemetry", "status"])),
?assertEqual(
#{<<"enable">> => true},
#{<<"enable">> => Default},
emqx_utils_json:decode(Result1)
),
?assertEqual(Default, is_telemetry_process_enabled()),
?assertMatch(
{ok, 400, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => true}
#{<<"enable">> => Default}
)
).
t_status_non_official(_) ->
?assertMatch(
{ok, 200, _},
request(
put,
uri(["telemetry", "status"]),
#{<<"enable">> => false}
)
).
),
?assertEqual(Default, is_telemetry_process_enabled()),
ok.
t_data(_) ->
?assert(is_telemetry_process_enabled()),
{ok, 200, Result} =
request(get, uri(["telemetry", "data"])),
@ -191,3 +191,23 @@ t_data(_) ->
request(get, uri(["telemetry", "data"])),
ok.
%% Support emqx:update_config([telemetry], Conf).
t_conf_update(_) ->
Conf = emqx:get_raw_config([telemetry]),
?assert(is_telemetry_process_enabled()),
{ok, 200, Result1} = request(get, uri(["telemetry", "status"])),
?assertEqual(#{<<"enable">> => true}, emqx_utils_json:decode(Result1)),
{ok, _} = emqx:update_config([telemetry], Conf#{<<"enable">> => false}),
{ok, 200, Result2} = request(get, uri(["telemetry", "status"])),
?assertEqual(#{<<"enable">> => false}, emqx_utils_json:decode(Result2)),
?assertNot(is_telemetry_process_enabled()),
%% reset to true
{ok, _} = emqx:update_config([telemetry], Conf#{<<"enable">> => true}),
?assert(is_telemetry_process_enabled()),
ok.
is_telemetry_process_enabled() ->
%% timer is not undefined.
Timer = element(6, sys:get_state(emqx_telemetry)),
is_reference(Timer).

View File

@ -2,7 +2,7 @@
{application, emqx_utils, [
{description, "Miscellaneous utilities for EMQX apps"},
% strict semver, bump manually!
{vsn, "5.0.1"},
{vsn, "5.0.2"},
{modules, [
emqx_utils,
emqx_utils_api,

View File

@ -575,6 +575,9 @@ try_to_existing_atom(Convert, Data, Encoding) ->
is_sensitive_key(token) -> true;
is_sensitive_key("token") -> true;
is_sensitive_key(<<"token">>) -> true;
is_sensitive_key(authorization) -> true;
is_sensitive_key("authorization") -> true;
is_sensitive_key(<<"authorization">>) -> true;
is_sensitive_key(password) -> true;
is_sensitive_key("password") -> true;
is_sensitive_key(<<"password">>) -> true;

View File

@ -0,0 +1 @@
Fix Internal Error 500 that occurred sometimes when bridge statistics page was updated while a node was (re)joining the cluster.

View File

@ -0,0 +1 @@
Fix the error of not being able to configure `auto_restart_interval` as infinity

View File

@ -0,0 +1,6 @@
In case the cluster updated license before the new node join in. The new node will not apply the updated license.
After this change, the new joined node will use the cluster's license key.
Sometimes the new node must start with a outdated license.
e.g. use emqx-operator deployed and needed to scale up after license expired.
At the time the cluster's license key already updated by API/CLI, but the new node won't use it.

View File

@ -0,0 +1 @@
Only include enabled authenticators and authorizers in telemetry report, not all of them.

View File

@ -0,0 +1 @@
Obfuscated sensitive data in the bad API logging.

177
changes/e5.0.4.en.md Normal file
View File

@ -0,0 +1,177 @@
# e5.0.4
## Enhancements
- [#10389](https://github.com/emqx/emqx/pull/10389) Unified the configuration formats for `cluster.core_nodes` and `cluster.statics.seeds`. Now they both support formats in array `["emqx1@127.0.0.1", "emqx2@127.0.0.1"]` and the comma-separated string `"emqx1@127.0.0.1,emqx2@127.0.0.1"`.
- [#10392](https://github.com/emqx/emqx/pull/10392) Introduced a new function to convert a formatted date to an integer timestamp: date_to_unix_ts/3.
`date_to_unix_ts(TimeUnit, FormatString, InputDateTimeString)`
- [#10426](https://github.com/emqx/emqx/pull/10426) Optimized the configuration priority mechanism to fix the issue where the configuration changes made to `etc/emqx.conf` do not take effect after restarting EMQX.
More information about the new mechanism: [Configure Override Rules](https://www.emqx.io/docs/en/v5.0/configuration/configuration.html#configure-override-rules)
- [#10457](https://github.com/emqx/emqx/pull/10457) Deprecated the integration with StatsD.
- [#10458](https://github.com/emqx/emqx/pull/10458) Set the level of plugin configuration options to low, users usually manage the plugins through the dashboard, rarely modify them manually, so we lowered the level.
- [#10491](https://github.com/emqx/emqx/pull/10491) Renamed `etcd.ssl` to `etcd.ssl_options` to keep all SSL options consistent in the configuration file.
- [#10512](https://github.com/emqx/emqx/pull/10512) Improved the storage format of Unicode characters in data files, Now we can store Unicode characters. For example: `SELECT * FROM "t/1" WHERE clientid = "-测试专用-"`.
- [#10568](https://github.com/emqx/emqx/pull/10568) Added `shutdown_counter` printout to `emqx ctl listeners` command.
- [#10588](https://github.com/emqx/emqx/pull/10588) Increased the time precision of trace logs from second to microsecond. For example, change from `2023-05-02T08:43:50+00:00` to `2023-05-02T08:43:50.237945+00:00`.
- [#10623](https://github.com/emqx/emqx/pull/10623) Renamed `max_message_queue_len` to `max_mailbox_size` in the `force_shutdown` configuration. The old name is kept as an alias, so this change is backward compatible.
- [#10713](https://github.com/emqx/emqx/pull/10713) Hide the `resource_option.request_timeout` of the webhook and it will use the value of `http` `request_timeout`.
- [#10075](https://github.com/emqx/emqx/pull/10075) Added node rebalance/node evacuation functionality. See also: [EIP doc](https://github.com/emqx/eip/blob/main/active/0020-node-rebalance.md)
- [#10378](https://github.com/emqx/emqx/pull/10378) Implemented Pulsar Producer Bridge and only producer role is supported now.
- [#10408](https://github.com/emqx/emqx/pull/10408) Introduced 3 built-in functions in the rule engine SQL-like language for creating values of the MongoDB date type.
- [#10409](https://github.com/emqx/emqx/pull/10409) [#10337](#10337) Supported [Protocol Buffers](https://protobuf.dev/) and [Apache Avro](https://avro.apache.org/) schemas in Schema Registry.
- [#10425](https://github.com/emqx/emqx/pull/10425) Implemented OpenTSDB data bridge.
- [#10498](https://github.com/emqx/emqx/pull/10498) Implemented Oracle Database Bridge.
- [#10560](https://github.com/emqx/emqx/pull/10560) Added enterprise data bridge for Apache IoTDB.
- [#10417](https://github.com/emqx/emqx/pull/10417) Improved get config items performance by eliminating temporary references.
- [#10430](https://github.com/emqx/emqx/pull/10430) Simplified the configuration of the `retainer` feature. Marked `flow_control` as a non-importance field.
- [#10511](https://github.com/emqx/emqx/pull/10511) Improved the security and privacy of some resource logs by masking sensitive information in the log.
- [#10525](https://github.com/emqx/emqx/pull/10525) Reduced resource usage per MQTT packet handling.
- [#10528](https://github.com/emqx/emqx/pull/10528) Reduced memory footprint in hot code path. The hot path includes the code that is frequently executed in core functionalities such as message handling, connection management, authentication, and authorization.
- [#10591](https://github.com/emqx/emqx/pull/10591) [#10625](https://github.com/emqx/emqx/pull/10625) Improved the configuration of the limiter.
- Reduced the complexity of the limiter's configuration.
- Updated the `configs/limiter` API to suit this refactor.
- Reduced the memory usage of the limiter configuration.
- [#10487](https://github.com/emqx/emqx/pull/10487) Optimized the instance of limiter for whose rate is `infinity` to reduce memory and CPU usage.
- [#10490](https://github.com/emqx/emqx/pull/10490) Removed the default limit of connect rate which used to be `1000/s`.
- [#10077](https://github.com/emqx/emqx/pull/10077) Added support for QUIC TLS password-protected certificate file.
## Bug Fixes
- [#10340](https://github.com/emqx/emqx/pull/10340) Fixed the issue that could lead to crash logs being printed when stopping EMQX via `systemd`.
`2023-03-29T16:43:25.915761+08:00 [error] Generic server memsup terminating. Reason: {port_died,normal}. Last message: {'EXIT',<0.2117.0>,{port_died,normal}}. State: [{data,[{"Timeout",60000}]},{items,{"Memory Usage",[{"Allocated",929959936},{"Total",3832242176}]}},{items,{"Worst Memory User",[{"Pid",<0.2031.0>},{"Memory",4720472}]}}]. 2023-03-29T16:43:25.924764+08:00 [error] crasher: initial call: memsup:init/1, pid: <0.2116.0>, registered_name: memsup, exit: {{port_died,normal},[{gen_server,handle_common_reply,8,[{file,"gen_server.erl"},{line,811}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]}, ancestors: [os_mon_sup,<0.2114.0>], message_queue_len: 0, messages: [], links: [<0.2115.0>], dictionary: [], trap_exit: true, status: running, heap_size: 4185, stack_size: 29, reductions: 187637; neighbours: 2023-03-29T16:43:25.924979+08:00 [error] Supervisor: {local,os_mon_sup}. Context: child_terminated. Reason: {port_died,normal}. Offender: id=memsup,pid=<0.2116.0>.`
- [#10369](https://github.com/emqx/emqx/pull/10369) Fixed error in `/api/v5/monitor_current` API endpoint that happens when some EMQX nodes are down.
Prior to this fix, sometimes the request returned HTTP code 500 and the following message:
`{"code":"INTERNAL_ERROR","message":"error, badarg, [{erlang,'++',[{error,nodedown},[{node,'emqx@10.42.0.150'}]], ...`
- [#10407](https://github.com/emqx/emqx/pull/10407) Fixed the crash issue of the alarm system.
- Leverage Mnesia dirty operations and circumvent extraneous calls to enhance 'emqx_alarm' performance.
- Use 'emqx_resource_manager' for reactivating alarms that have already been triggered.
- Implement the newly developed, fail-safe 'emqx_alarm' API to control the activation and deactivation of alarms, thus preventing 'emqx_resource_manager' from crashing due to alarm timeouts.
- The alarm system is susceptible to crashing under these concurrent conditions:
- A significant number of resources fail, such as when bridges continuously attempt to trigger alarms due to recurring errors.
- The system is under an extremely high load.
- [#10420](https://github.com/emqx/emqx/pull/10420) Fixed HTTP path handling when composing the URL for the HTTP requests in authentication and authorization modules.
- Avoid unnecessary URL normalization since we cannot assume that external servers treat original and normalized URLs equally. This led to bugs like [#10411](https://github.com/emqx/emqx/issues/10411).
- Fixed the issue that path segments could be HTTP encoded twice.
- [#10422](https://github.com/emqx/emqx/pull/10422) Fixed a bug where external plugins could not be configured via environment variables in a lone-node cluster.
- [#10448](https://github.com/emqx/emqx/pull/10448) Fixed a compatibility issue of limiter configuration introduced by e5.0.3 which broke the upgrade from previous versions if the `capacity` is `infinity`.
In e5.0.3 we have replaced `capacity` with `burst`. After this fix, a `capacity = infinity` config will be automatically converted to equivalent `burst = 0`.
- [#10462](https://github.com/emqx/emqx/pull/10462) Deprecated config `broker.shared_dispatch_ack_enabled`. This was designed to avoid dispatching messages to a shared-subscription session that has the client disconnected. However, since e5.0.0, this feature is no longer helpful because the shared-subscription messages in an expired session will be redispatched to other sessions in the group. See also: <https://github.com/emqx/emqx/pull/9104> .
- [#10463](https://github.com/emqx/emqx/pull/10463) Improved bridges API error handling. If Webhook bridge URL is not valid, the bridges API will return '400' error instead of '500'.
- [#10484](https://github.com/emqx/emqx/pull/10484) Fixed the issue that the priority of the configuration cannot be set during the rolling upgrade. For example, when authorization is modified in e5.0.2 and then upgraded e5.0.3 through the rolling upgrade, the authorization will be restored to the default.
- [#10495](https://github.com/emqx/emqx/pull/10495) Added the limiter API `/configs/limiter` which was deleted by mistake back.
- [#10500](https://github.com/emqx/emqx/pull/10500) Added several fixes, enhancements, and features in Mria:
- Protect `mria:join/1,2` with a global lock to prevent conflicts between two nodes trying to join each other simultaneously [Mria PR](https://github.com/emqx/mria/pull/137)
- Implement new function `mria:sync_transaction/4,3,2`, which blocks the caller until a transaction is imported to the local node (if the local node is a replicant, otherwise, it behaves exactly the same as `mria:transaction/3,2`) [Mria PR](https://github.com/emqx/mria/pull/136)
- Optimize `mria:running_nodes/0` [Mria PR](https://github.com/emqx/mria/pull/135)
- Optimize `mria:ro_transaction/2` when called on a replicant node [Mria PR](https://github.com/emqx/mria/pull/134).
- [#10518](https://github.com/emqx/emqx/pull/10518) Added the following fixes and features in Mria:
- Call `mria_rlog:role/1` safely in mria_membership to ensure that mria_membership gen_server won't crash if RPC to another node fails [Mria PR](https://github.com/emqx/mria/pull/139)
- Add an extra field to `?rlog_sync` table to facilitate extending this functionality in future [Mria PR](https://github.com/emqx/mria/pull/138).
- [#10556](https://github.com/emqx/emqx/pull/10556) Wrapped potentially sensitive data in `emqx_connector_http` if `Authorization` headers are being passed at initialization.
- [#10571](https://github.com/emqx/emqx/pull/10571) Stopped emitting useless crash report when EMQX stops.
- [#10659](https://github.com/emqx/emqx/pull/10659) Fixed the issue where EMQX cannot start when `sysmon.os.mem_check_interval` is disabled.
- [#10717](https://github.com/emqx/emqx/pull/10717) Fixed an issue where the buffering layer processes could use a lot of CPU when inflight window is full.
- [#10724](https://github.com/emqx/emqx/pull/10724) A summary has been added for all endpoints in the HTTP API documentation (accessible at "http://<emqx_host_name>:18083/api-docs").
- [#10726](https://github.com/emqx/emqx/pull/10726) Health Check Interval and Auto Restart Interval now support the range from 1ms to 1 hour.
- [#10728](https://github.com/emqx/emqx/pull/10728) Fixed an issue where the rule engine was unable to access variables exported by `FOREACH` - `DO` clause.
Given a payload: `{"date": "2023-05-06", "array": ["a"]}`, as well as the following SQL statement:
`FOREACH payload.date as date, payload.array as elem DO date, elem FROM "t/#" -- {"date": "2023-05-06", "array": ["a"]}`
Prior to the fix, the `date` variable exported by `FOREACH` could not be accessed in the `DO` clause of the above SQL, resulting in the following output for the SQL statement: `[{"elem": "a","date": "undefined"}]`.
- [#10742](https://github.com/emqx/emqx/pull/10742) Correctness check of the rules is enforced before saving the authorization file source. Previously, Saving wrong rules could lead to EMQX restart failure.
- [#10743](https://github.com/emqx/emqx/pull/10743) Fixed an issue where trying to get bridge info or metrics could result in a crash when a node is joining a cluster.
- [#10755](https://github.com/emqx/emqx/pull/10755) Fixed data bridge resource update race condition.
In the 'delete + create' process for EMQX resource updates, long bridge creation times could cause dashboard request timeouts. If a bridge resource update was initiated before completion of its creation, it led to an erroneous deletion from the runtime, despite being present in the config file.
This fix addresses the race condition in bridge resource updates, ensuring the accurate identification and addition of new resources, and maintaining consistency between runtime and configuration file statuses.
- [#10761](https://github.com/emqx/emqx/pull/10761) Fixed the issue where the default value of SSL certificate for Dashboard Listener was not correctly interpolated, which caused HTTPS to be inaccessible when `verify_peer` and `cacertfile` were using the default configuration.
- [#10672](https://github.com/emqx/emqx/pull/10672) Fixed the issue where the lack of a default value for `ssl_options` in listeners results in startup failure. For example, such command(`EMQX_LISTENERS__WSS__DEFAULT__BIND='0.0.0.0:8089' ./bin/emqx console`) would have caused a crash before.
- [#10738](https://github.com/emqx/emqx/pull/10738) TDEngine data bridge now supports "Supertable" and "Create Tables Automatically". Before this fix, an insert with a supertable in the template will fail, like this:
- `insert into ${clientid} using msg TAGS (${clientid}) values (${ts},${msg})`.
- [#10746](https://github.com/emqx/emqx/pull/10746) Add missing support of the event `$events/delivery_dropped` into the rule engine test API `rule_test`.
- [#10747](https://github.com/emqx/emqx/pull/10747) Ported some time formating fixes in Rule-Engine functions from version 4.4.
- [#10760](https://github.com/emqx/emqx/pull/10760) Fix "internal error 500" when getting bridge statistics page while a node is joining the cluster.
- [#10801](https://github.com/emqx/emqx/pull/10801) Avoid double percent-decode for topic name in API `/topics/{topic}` and `/topics`.
- [#10817](https://github.com/emqx/emqx/pull/10817) Fix a config value handling for bridge resource option `auto_restart_interval`, now it can be set to `infinity`.

View File

@ -0,0 +1 @@
Fix password leaking on stacktrace for Oracle Database.

View File

@ -1,6 +1,6 @@
{application, emqx_ee_bridge, [
{description, "EMQX Enterprise data bridges"},
{vsn, "0.1.13"},
{vsn, "0.1.14"},
{registered, []},
{applications, [
kernel,

View File

@ -53,3 +53,29 @@ t_update(_Config) ->
_ = emqx_license_cli:license(["update", LicenseValue]),
_ = emqx_license_cli:license(["reload"]),
_ = emqx_license_cli:license(["update", "Invalid License Value"]).
t_conf_update(_Config) ->
ok = persistent_term:put(
emqx_license_test_pubkey,
emqx_license_test_lib:public_key_pem()
),
LicenseKey = emqx_license_test_lib:make_license(#{max_connections => "123"}),
Conf = #{
<<"connection_high_watermark">> => <<"50%">>,
<<"connection_low_watermark">> => <<"45%">>,
<<"key">> => LicenseKey
},
?assertMatch({ok, _}, emqx:update_config([license], Conf)),
?assertEqual(
#{
connection_high_watermark => 0.5,
connection_low_watermark => 0.45,
key => LicenseKey
},
emqx:get_config([license])
),
?assertMatch(
#{max_connections := 123},
maps:from_list(emqx_license_checker:dump())
),
ok.

View File

@ -29,6 +29,11 @@ emqx_bridge_oracle {
label = "Oracle Database Sid."
}
service_name {
desc = "Service Name for Oracle Database."
label = "Oracle Database Service Name"
}
config_enable {
desc = "Enable or disable this bridge"
label = "Enable Or Disable Bridge"

View File

@ -12,4 +12,9 @@ emqx_oracle {
label = "Oracle Database Sid"
}
service_name {
desc = "Service Name for Oracle Database."
label = "Oracle Database Service Name"
}
}

View File

@ -28,6 +28,11 @@ emqx_bridge_oracle {
label = "Oracle Database Sid"
}
service_name {
desc = "Oracle Database 服务名称。"
label = "Oracle Database 服务名称"
}
config_enable {
desc = "启用/禁用桥接"
label = "启用/禁用桥接"

View File

@ -12,4 +12,9 @@ emqx_oracle {
label = "Oracle Database Sid"
}
service_name {
desc = "Oracle Database 服务名称。"
label = "Oracle Database 服务名称"
}
}