Merge branch 'release-50' into file-transfer
* release-50: fix(pulsar): use a binary duration as default `health_check_interval` docs: add changelog entry docs: clarify description of bridge username and password chore: bump to v5.0.25 fix(limiter): adjust type for compatibility fix(limiter): fix that update node-level limiter config will not working chore: upgrade dashboard to v1.2.4-1 for ce chore: upgarde rulesql to 0.1.6 to fix invaid utf8 input chore: add changelog for 10659 fix: crash when sysmon.os.mem_check_interval = disabled chore: bump influxdb version && update changes refactor(influxdb): move influxdb bridge into its own app chore: add listener default changelog fix: ocsp cache SUITE failed fix: ensure atom key for emqx_config:get fix: only fill cerf_file default in server side fix: authn init is empty fix: bad listeners default ssl_options
This commit is contained in:
commit
1a8cf0e392
2
Makefile
2
Makefile
|
@ -15,7 +15,7 @@ endif
|
||||||
|
|
||||||
# Dashbord version
|
# Dashbord version
|
||||||
# from https://github.com/emqx/emqx-dashboard5
|
# from https://github.com/emqx/emqx-dashboard5
|
||||||
export EMQX_DASHBOARD_VERSION ?= v1.2.4
|
export EMQX_DASHBOARD_VERSION ?= v1.2.4-1
|
||||||
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6
|
export EMQX_EE_DASHBOARD_VERSION ?= e1.0.6
|
||||||
|
|
||||||
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
|
# `:=` should be used here, otherwise the `$(shell ...)` will be executed every time when the variable is used
|
||||||
|
|
|
@ -32,7 +32,7 @@
|
||||||
%% `apps/emqx/src/bpapi/README.md'
|
%% `apps/emqx/src/bpapi/README.md'
|
||||||
|
|
||||||
%% Community edition
|
%% Community edition
|
||||||
-define(EMQX_RELEASE_CE, "5.0.25-rc.1").
|
-define(EMQX_RELEASE_CE, "5.0.25").
|
||||||
|
|
||||||
%% Enterprise edition
|
%% Enterprise edition
|
||||||
-define(EMQX_RELEASE_EE, "5.0.4-alpha.1").
|
-define(EMQX_RELEASE_EE, "5.0.4-alpha.1").
|
||||||
|
|
|
@ -131,11 +131,9 @@ delete_root(Type) ->
|
||||||
delete_bucket(?ROOT_ID, Type).
|
delete_bucket(?ROOT_ID, Type).
|
||||||
|
|
||||||
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
|
post_config_update([limiter], _Config, NewConf, _OldConf, _AppEnvs) ->
|
||||||
Types = lists:delete(client, maps:keys(NewConf)),
|
Conf = emqx_limiter_schema:convert_node_opts(NewConf),
|
||||||
_ = [on_post_config_update(Type, NewConf) || Type <- Types],
|
_ = [on_post_config_update(Type, Cfg) || {Type, Cfg} <- maps:to_list(Conf)],
|
||||||
ok;
|
ok.
|
||||||
post_config_update([limiter, Type], _Config, NewConf, _OldConf, _AppEnvs) ->
|
|
||||||
on_post_config_update(Type, NewConf).
|
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% @doc
|
%% @doc
|
||||||
|
@ -279,8 +277,7 @@ format_status(_Opt, Status) ->
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
on_post_config_update(Type, NewConf) ->
|
on_post_config_update(Type, Config) ->
|
||||||
Config = maps:get(Type, NewConf),
|
|
||||||
case emqx_limiter_server:whereis(Type) of
|
case emqx_limiter_server:whereis(Type) of
|
||||||
undefined ->
|
undefined ->
|
||||||
start_server(Type, Config);
|
start_server(Type, Config);
|
||||||
|
|
|
@ -38,7 +38,8 @@
|
||||||
default_client_config/0,
|
default_client_config/0,
|
||||||
short_paths_fields/1,
|
short_paths_fields/1,
|
||||||
get_listener_opts/1,
|
get_listener_opts/1,
|
||||||
get_node_opts/1
|
get_node_opts/1,
|
||||||
|
convert_node_opts/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-define(KILOBYTE, 1024).
|
-define(KILOBYTE, 1024).
|
||||||
|
@ -60,7 +61,7 @@
|
||||||
-type limiter_id() :: atom().
|
-type limiter_id() :: atom().
|
||||||
-type bucket_name() :: atom().
|
-type bucket_name() :: atom().
|
||||||
-type rate() :: infinity | float().
|
-type rate() :: infinity | float().
|
||||||
-type burst_rate() :: 0 | float().
|
-type burst_rate() :: number().
|
||||||
%% this is a compatible type for the deprecated field and type `capacity`.
|
%% this is a compatible type for the deprecated field and type `capacity`.
|
||||||
-type burst() :: burst_rate().
|
-type burst() :: burst_rate().
|
||||||
%% the capacity of the token bucket
|
%% the capacity of the token bucket
|
||||||
|
@ -309,6 +310,24 @@ get_node_opts(Type) ->
|
||||||
end
|
end
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
convert_node_opts(Conf) ->
|
||||||
|
DefBucket = default_bucket_config(),
|
||||||
|
ShorPaths = short_paths(),
|
||||||
|
Fun = fun
|
||||||
|
%% The `client` in the node options was deprecated
|
||||||
|
(client, _Value, Acc) ->
|
||||||
|
Acc;
|
||||||
|
(Name, Value, Acc) ->
|
||||||
|
case lists:member(Name, ShorPaths) of
|
||||||
|
true ->
|
||||||
|
Type = short_path_name_to_type(Name),
|
||||||
|
Acc#{Type => DefBucket#{rate => Value}};
|
||||||
|
_ ->
|
||||||
|
Acc#{Name => Value}
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
maps:fold(Fun, #{}, Conf).
|
||||||
|
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Internal functions
|
%% Internal functions
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
|
@ -23,8 +23,6 @@
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
|
||||||
-export([
|
-export([
|
||||||
get_mem_check_interval/0,
|
|
||||||
set_mem_check_interval/1,
|
|
||||||
get_sysmem_high_watermark/0,
|
get_sysmem_high_watermark/0,
|
||||||
set_sysmem_high_watermark/1,
|
set_sysmem_high_watermark/1,
|
||||||
get_procmem_high_watermark/0,
|
get_procmem_high_watermark/0,
|
||||||
|
@ -46,6 +44,9 @@
|
||||||
terminate/2,
|
terminate/2,
|
||||||
code_change/3
|
code_change/3
|
||||||
]).
|
]).
|
||||||
|
-ifdef(TEST).
|
||||||
|
-export([is_sysmem_check_supported/0]).
|
||||||
|
-endif.
|
||||||
|
|
||||||
-include("emqx.hrl").
|
-include("emqx.hrl").
|
||||||
|
|
||||||
|
@ -61,14 +62,6 @@ update(OS) ->
|
||||||
%% API
|
%% API
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
get_mem_check_interval() ->
|
|
||||||
memsup:get_check_interval().
|
|
||||||
|
|
||||||
set_mem_check_interval(Seconds) when Seconds < 60000 ->
|
|
||||||
memsup:set_check_interval(1);
|
|
||||||
set_mem_check_interval(Seconds) ->
|
|
||||||
memsup:set_check_interval(Seconds div 60000).
|
|
||||||
|
|
||||||
get_sysmem_high_watermark() ->
|
get_sysmem_high_watermark() ->
|
||||||
gen_server:call(?OS_MON, ?FUNCTION_NAME, infinity).
|
gen_server:call(?OS_MON, ?FUNCTION_NAME, infinity).
|
||||||
|
|
||||||
|
@ -103,11 +96,9 @@ init_os_monitor() ->
|
||||||
init_os_monitor(OS) ->
|
init_os_monitor(OS) ->
|
||||||
#{
|
#{
|
||||||
sysmem_high_watermark := SysHW,
|
sysmem_high_watermark := SysHW,
|
||||||
procmem_high_watermark := PHW,
|
procmem_high_watermark := PHW
|
||||||
mem_check_interval := MCI
|
|
||||||
} = OS,
|
} = OS,
|
||||||
set_procmem_high_watermark(PHW),
|
set_procmem_high_watermark(PHW),
|
||||||
set_mem_check_interval(MCI),
|
|
||||||
ok = update_mem_alarm_status(SysHW),
|
ok = update_mem_alarm_status(SysHW),
|
||||||
SysHW.
|
SysHW.
|
||||||
|
|
||||||
|
|
|
@ -2189,8 +2189,8 @@ filter(Opts) ->
|
||||||
|
|
||||||
%% @private This function defines the SSL opts which are commonly used by
|
%% @private This function defines the SSL opts which are commonly used by
|
||||||
%% SSL listener and client.
|
%% SSL listener and client.
|
||||||
-spec common_ssl_opts_schema(map()) -> hocon_schema:field_schema().
|
-spec common_ssl_opts_schema(map(), server | client) -> hocon_schema:field_schema().
|
||||||
common_ssl_opts_schema(Defaults) ->
|
common_ssl_opts_schema(Defaults, Type) ->
|
||||||
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
|
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
|
||||||
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
|
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
|
||||||
Collection = maps:get(versions, Defaults, tls_all_available),
|
Collection = maps:get(versions, Defaults, tls_all_available),
|
||||||
|
@ -2200,7 +2200,7 @@ common_ssl_opts_schema(Defaults) ->
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
default => D("cacertfile"),
|
default => cert_file("cacert.pem", Type),
|
||||||
required => false,
|
required => false,
|
||||||
desc => ?DESC(common_ssl_opts_schema_cacertfile)
|
desc => ?DESC(common_ssl_opts_schema_cacertfile)
|
||||||
}
|
}
|
||||||
|
@ -2209,7 +2209,7 @@ common_ssl_opts_schema(Defaults) ->
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
default => D("certfile"),
|
default => cert_file("cert.pem", Type),
|
||||||
required => false,
|
required => false,
|
||||||
desc => ?DESC(common_ssl_opts_schema_certfile)
|
desc => ?DESC(common_ssl_opts_schema_certfile)
|
||||||
}
|
}
|
||||||
|
@ -2218,7 +2218,7 @@ common_ssl_opts_schema(Defaults) ->
|
||||||
sc(
|
sc(
|
||||||
binary(),
|
binary(),
|
||||||
#{
|
#{
|
||||||
default => D("keyfile"),
|
default => cert_file("key.pem", Type),
|
||||||
required => false,
|
required => false,
|
||||||
desc => ?DESC(common_ssl_opts_schema_keyfile)
|
desc => ?DESC(common_ssl_opts_schema_keyfile)
|
||||||
}
|
}
|
||||||
|
@ -2305,7 +2305,7 @@ common_ssl_opts_schema(Defaults) ->
|
||||||
server_ssl_opts_schema(Defaults, IsRanchListener) ->
|
server_ssl_opts_schema(Defaults, IsRanchListener) ->
|
||||||
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
|
D = fun(Field) -> maps:get(to_atom(Field), Defaults, undefined) end,
|
||||||
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
|
Df = fun(Field, Default) -> maps:get(to_atom(Field), Defaults, Default) end,
|
||||||
common_ssl_opts_schema(Defaults) ++
|
common_ssl_opts_schema(Defaults, server) ++
|
||||||
[
|
[
|
||||||
{"dhfile",
|
{"dhfile",
|
||||||
sc(
|
sc(
|
||||||
|
@ -2431,7 +2431,7 @@ crl_outer_validator(_SSLOpts) ->
|
||||||
%% @doc Make schema for SSL client.
|
%% @doc Make schema for SSL client.
|
||||||
-spec client_ssl_opts_schema(map()) -> hocon_schema:field_schema().
|
-spec client_ssl_opts_schema(map()) -> hocon_schema:field_schema().
|
||||||
client_ssl_opts_schema(Defaults) ->
|
client_ssl_opts_schema(Defaults) ->
|
||||||
common_ssl_opts_schema(Defaults) ++
|
common_ssl_opts_schema(Defaults, client) ++
|
||||||
[
|
[
|
||||||
{"enable",
|
{"enable",
|
||||||
sc(
|
sc(
|
||||||
|
@ -3251,13 +3251,10 @@ default_listener(ws) ->
|
||||||
};
|
};
|
||||||
default_listener(SSLListener) ->
|
default_listener(SSLListener) ->
|
||||||
%% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate
|
%% The env variable is resolved in emqx_tls_lib by calling naive_env_interpolate
|
||||||
CertFile = fun(Name) ->
|
|
||||||
iolist_to_binary("${EMQX_ETC_DIR}/" ++ filename:join(["certs", Name]))
|
|
||||||
end,
|
|
||||||
SslOptions = #{
|
SslOptions = #{
|
||||||
<<"cacertfile">> => CertFile(<<"cacert.pem">>),
|
<<"cacertfile">> => cert_file(<<"cacert.pem">>, server),
|
||||||
<<"certfile">> => CertFile(<<"cert.pem">>),
|
<<"certfile">> => cert_file(<<"cert.pem">>, server),
|
||||||
<<"keyfile">> => CertFile(<<"key.pem">>)
|
<<"keyfile">> => cert_file(<<"key.pem">>, server)
|
||||||
},
|
},
|
||||||
case SSLListener of
|
case SSLListener of
|
||||||
ssl ->
|
ssl ->
|
||||||
|
@ -3374,3 +3371,6 @@ ensure_default_listener(#{<<"default">> := _} = Map, _ListenerType) ->
|
||||||
ensure_default_listener(Map, ListenerType) ->
|
ensure_default_listener(Map, ListenerType) ->
|
||||||
NewMap = Map#{<<"default">> => default_listener(ListenerType)},
|
NewMap = Map#{<<"default">> => default_listener(ListenerType)},
|
||||||
keep_default_tombstone(NewMap, #{}).
|
keep_default_tombstone(NewMap, #{}).
|
||||||
|
|
||||||
|
cert_file(_File, client) -> undefined;
|
||||||
|
cert_file(File, server) -> iolist_to_binary(filename:join(["${EMQX_ETC_DIR}", "certs", File])).
|
||||||
|
|
|
@ -967,20 +967,11 @@ do_t_validations(_Config) ->
|
||||||
{error, {_, _, ResRaw3}} = update_listener_via_api(ListenerId, ListenerData3),
|
{error, {_, _, ResRaw3}} = update_listener_via_api(ListenerId, ListenerData3),
|
||||||
#{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := MsgRaw3} =
|
#{<<"code">> := <<"BAD_REQUEST">>, <<"message">> := MsgRaw3} =
|
||||||
emqx_utils_json:decode(ResRaw3, [return_maps]),
|
emqx_utils_json:decode(ResRaw3, [return_maps]),
|
||||||
|
%% we can't remove certfile now, because it has default value.
|
||||||
?assertMatch(
|
?assertMatch(
|
||||||
#{
|
<<"{bad_ssl_config,#{file_read => enoent,pem_check => invalid_pem", _/binary>>,
|
||||||
<<"mismatches">> :=
|
MsgRaw3
|
||||||
#{
|
|
||||||
<<"listeners:ssl_not_required_bind">> :=
|
|
||||||
#{
|
|
||||||
<<"reason">> :=
|
|
||||||
<<"Server certificate must be defined when using OCSP stapling">>
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
emqx_utils_json:decode(MsgRaw3, [return_maps])
|
|
||||||
),
|
),
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_unknown_error_fetching_ocsp_response(_Config) ->
|
t_unknown_error_fetching_ocsp_response(_Config) ->
|
||||||
|
|
|
@ -43,8 +43,8 @@ init_per_testcase(t_cpu_check_alarm, Config) ->
|
||||||
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
|
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
|
||||||
Config;
|
Config;
|
||||||
init_per_testcase(t_sys_mem_check_alarm, Config) ->
|
init_per_testcase(t_sys_mem_check_alarm, Config) ->
|
||||||
case os:type() of
|
case emqx_os_mon:is_sysmem_check_supported() of
|
||||||
{unix, linux} ->
|
true ->
|
||||||
SysMon = emqx_config:get([sysmon, os], #{}),
|
SysMon = emqx_config:get([sysmon, os], #{}),
|
||||||
emqx_config:put([sysmon, os], SysMon#{
|
emqx_config:put([sysmon, os], SysMon#{
|
||||||
sysmem_high_watermark => 0.51,
|
sysmem_high_watermark => 0.51,
|
||||||
|
@ -54,7 +54,7 @@ init_per_testcase(t_sys_mem_check_alarm, Config) ->
|
||||||
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
|
ok = supervisor:terminate_child(emqx_sys_sup, emqx_os_mon),
|
||||||
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
|
{ok, _} = supervisor:restart_child(emqx_sys_sup, emqx_os_mon),
|
||||||
Config;
|
Config;
|
||||||
_ ->
|
false ->
|
||||||
Config
|
Config
|
||||||
end;
|
end;
|
||||||
init_per_testcase(_, Config) ->
|
init_per_testcase(_, Config) ->
|
||||||
|
@ -63,12 +63,6 @@ init_per_testcase(_, Config) ->
|
||||||
Config.
|
Config.
|
||||||
|
|
||||||
t_api(_) ->
|
t_api(_) ->
|
||||||
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
|
|
||||||
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(30000)),
|
|
||||||
?assertEqual(60000, emqx_os_mon:get_mem_check_interval()),
|
|
||||||
?assertEqual(ok, emqx_os_mon:set_mem_check_interval(122000)),
|
|
||||||
?assertEqual(120000, emqx_os_mon:get_mem_check_interval()),
|
|
||||||
|
|
||||||
?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()),
|
?assertEqual(0.7, emqx_os_mon:get_sysmem_high_watermark()),
|
||||||
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),
|
?assertEqual(ok, emqx_os_mon:set_sysmem_high_watermark(0.8)),
|
||||||
?assertEqual(0.8, emqx_os_mon:get_sysmem_high_watermark()),
|
?assertEqual(0.8, emqx_os_mon:get_sysmem_high_watermark()),
|
||||||
|
@ -86,12 +80,29 @@ t_api(_) ->
|
||||||
gen_server:stop(emqx_os_mon),
|
gen_server:stop(emqx_os_mon),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_sys_mem_check_disable(Config) ->
|
||||||
|
case emqx_os_mon:is_sysmem_check_supported() of
|
||||||
|
true -> do_sys_mem_check_disable(Config);
|
||||||
|
false -> skip
|
||||||
|
end.
|
||||||
|
|
||||||
|
do_sys_mem_check_disable(_Config) ->
|
||||||
|
MemRef0 = maps:get(mem_time_ref, sys:get_state(emqx_os_mon)),
|
||||||
|
?assertEqual(true, is_reference(MemRef0), MemRef0),
|
||||||
|
emqx_config:put([sysmon, os, mem_check_interval], 1000),
|
||||||
|
emqx_os_mon:update(emqx_config:get([sysmon, os])),
|
||||||
|
MemRef1 = maps:get(mem_time_ref, sys:get_state(emqx_os_mon)),
|
||||||
|
?assertEqual(true, is_reference(MemRef1), {MemRef0, MemRef1}),
|
||||||
|
?assertNotEqual(MemRef0, MemRef1),
|
||||||
|
emqx_config:put([sysmon, os, mem_check_interval], disabled),
|
||||||
|
emqx_os_mon:update(emqx_config:get([sysmon, os])),
|
||||||
|
?assertEqual(undefined, maps:get(mem_time_ref, sys:get_state(emqx_os_mon))),
|
||||||
|
ok.
|
||||||
|
|
||||||
t_sys_mem_check_alarm(Config) ->
|
t_sys_mem_check_alarm(Config) ->
|
||||||
case os:type() of
|
case emqx_os_mon:is_sysmem_check_supported() of
|
||||||
{unix, linux} ->
|
true -> do_sys_mem_check_alarm(Config);
|
||||||
do_sys_mem_check_alarm(Config);
|
false -> skip
|
||||||
_ ->
|
|
||||||
skip
|
|
||||||
end.
|
end.
|
||||||
|
|
||||||
do_sys_mem_check_alarm(_Config) ->
|
do_sys_mem_check_alarm(_Config) ->
|
||||||
|
|
|
@ -72,7 +72,7 @@ chain_configs() ->
|
||||||
[global_chain_config() | listener_chain_configs()].
|
[global_chain_config() | listener_chain_configs()].
|
||||||
|
|
||||||
global_chain_config() ->
|
global_chain_config() ->
|
||||||
{?GLOBAL, emqx:get_config([?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY], [])}.
|
{?GLOBAL, emqx:get_config([?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM], [])}.
|
||||||
|
|
||||||
listener_chain_configs() ->
|
listener_chain_configs() ->
|
||||||
lists:map(
|
lists:map(
|
||||||
|
@ -83,9 +83,11 @@ listener_chain_configs() ->
|
||||||
).
|
).
|
||||||
|
|
||||||
auth_config_path(ListenerID) ->
|
auth_config_path(ListenerID) ->
|
||||||
[<<"listeners">>] ++
|
Names = [
|
||||||
binary:split(atom_to_binary(ListenerID), <<":">>) ++
|
binary_to_existing_atom(N, utf8)
|
||||||
[?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_BINARY].
|
|| N <- binary:split(atom_to_binary(ListenerID), <<":">>)
|
||||||
|
],
|
||||||
|
[listeners] ++ Names ++ [?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME_ATOM].
|
||||||
|
|
||||||
provider_types() ->
|
provider_types() ->
|
||||||
lists:map(fun({Type, _Module}) -> Type end, emqx_authn:providers()).
|
lists:map(fun({Type, _Module}) -> Type end, emqx_authn:providers()).
|
||||||
|
|
|
@ -54,13 +54,14 @@
|
||||||
|
|
||||||
-define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
|
-define(BRIDGE_NOT_FOUND(BRIDGE_TYPE, BRIDGE_NAME),
|
||||||
?NOT_FOUND(
|
?NOT_FOUND(
|
||||||
<<"Bridge lookup failed: bridge named '", (BRIDGE_NAME)/binary, "' of type ",
|
<<"Bridge lookup failed: bridge named '", (bin(BRIDGE_NAME))/binary, "' of type ",
|
||||||
(bin(BRIDGE_TYPE))/binary, " does not exist.">>
|
(bin(BRIDGE_TYPE))/binary, " does not exist.">>
|
||||||
)
|
)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% Don't turn bridge_name to atom, it's maybe not a existing atom.
|
||||||
-define(TRY_PARSE_ID(ID, EXPR),
|
-define(TRY_PARSE_ID(ID, EXPR),
|
||||||
try emqx_bridge_resource:parse_bridge_id(Id) of
|
try emqx_bridge_resource:parse_bridge_id(Id, #{atom_name => false}) of
|
||||||
{BridgeType, BridgeName} ->
|
{BridgeType, BridgeName} ->
|
||||||
EXPR
|
EXPR
|
||||||
catch
|
catch
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
resource_id/2,
|
resource_id/2,
|
||||||
bridge_id/2,
|
bridge_id/2,
|
||||||
parse_bridge_id/1,
|
parse_bridge_id/1,
|
||||||
|
parse_bridge_id/2,
|
||||||
bridge_hookpoint/1,
|
bridge_hookpoint/1,
|
||||||
bridge_hookpoint_to_bridge_id/1
|
bridge_hookpoint_to_bridge_id/1
|
||||||
]).
|
]).
|
||||||
|
@ -86,11 +87,15 @@ bridge_id(BridgeType, BridgeName) ->
|
||||||
Type = bin(BridgeType),
|
Type = bin(BridgeType),
|
||||||
<<Type/binary, ":", Name/binary>>.
|
<<Type/binary, ":", Name/binary>>.
|
||||||
|
|
||||||
-spec parse_bridge_id(list() | binary() | atom()) -> {atom(), binary()}.
|
|
||||||
parse_bridge_id(BridgeId) ->
|
parse_bridge_id(BridgeId) ->
|
||||||
|
parse_bridge_id(BridgeId, #{atom_name => true}).
|
||||||
|
|
||||||
|
-spec parse_bridge_id(list() | binary() | atom(), #{atom_name => boolean()}) ->
|
||||||
|
{atom(), atom() | binary()}.
|
||||||
|
parse_bridge_id(BridgeId, Opts) ->
|
||||||
case string:split(bin(BridgeId), ":", all) of
|
case string:split(bin(BridgeId), ":", all) of
|
||||||
[Type, Name] ->
|
[Type, Name] ->
|
||||||
{to_type_atom(Type), validate_name(Name)};
|
{to_type_atom(Type), validate_name(Name, Opts)};
|
||||||
_ ->
|
_ ->
|
||||||
invalid_data(
|
invalid_data(
|
||||||
<<"should be of pattern {type}:{name}, but got ", BridgeId/binary>>
|
<<"should be of pattern {type}:{name}, but got ", BridgeId/binary>>
|
||||||
|
@ -105,13 +110,16 @@ bridge_hookpoint_to_bridge_id(?BRIDGE_HOOKPOINT(BridgeId)) ->
|
||||||
bridge_hookpoint_to_bridge_id(_) ->
|
bridge_hookpoint_to_bridge_id(_) ->
|
||||||
{error, bad_bridge_hookpoint}.
|
{error, bad_bridge_hookpoint}.
|
||||||
|
|
||||||
validate_name(Name0) ->
|
validate_name(Name0, Opts) ->
|
||||||
Name = unicode:characters_to_list(Name0, utf8),
|
Name = unicode:characters_to_list(Name0, utf8),
|
||||||
case is_list(Name) andalso Name =/= [] of
|
case is_list(Name) andalso Name =/= [] of
|
||||||
true ->
|
true ->
|
||||||
case lists:all(fun is_id_char/1, Name) of
|
case lists:all(fun is_id_char/1, Name) of
|
||||||
true ->
|
true ->
|
||||||
Name0;
|
case maps:get(atom_name, Opts, true) of
|
||||||
|
true -> list_to_existing_atom(Name);
|
||||||
|
false -> Name0
|
||||||
|
end;
|
||||||
false ->
|
false ->
|
||||||
invalid_data(<<"bad name: ", Name0/binary>>)
|
invalid_data(<<"bad name: ", Name0/binary>>)
|
||||||
end;
|
end;
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
toxiproxy
|
||||||
|
influxdb
|
|
@ -0,0 +1,8 @@
|
||||||
|
{erl_opts, [debug_info]}.
|
||||||
|
|
||||||
|
{deps, [
|
||||||
|
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
|
||||||
|
{emqx_connector, {path, "../../apps/emqx_connector"}},
|
||||||
|
{emqx_resource, {path, "../../apps/emqx_resource"}},
|
||||||
|
{emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||||
|
]}.
|
|
@ -1,8 +1,8 @@
|
||||||
{application, emqx_bridge_influxdb, [
|
{application, emqx_bridge_influxdb, [
|
||||||
{description, "EMQX Enterprise InfluxDB Bridge"},
|
{description, "EMQX Enterprise InfluxDB Bridge"},
|
||||||
{vsn, "0.1.0"},
|
{vsn, "0.1.1"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [kernel, stdlib]},
|
{applications, [kernel, stdlib, influxdb]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{links, []}
|
{links, []}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ee_bridge_influxdb).
|
-module(emqx_bridge_influxdb).
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
@ -134,7 +134,7 @@ influxdb_bridge_common_fields() ->
|
||||||
emqx_resource_schema:fields("resource_opts").
|
emqx_resource_schema:fields("resource_opts").
|
||||||
|
|
||||||
connector_fields(Type) ->
|
connector_fields(Type) ->
|
||||||
emqx_ee_connector_influxdb:fields(Type).
|
emqx_bridge_influxdb_connector:fields(Type).
|
||||||
|
|
||||||
type_name_fields(Type) ->
|
type_name_fields(Type) ->
|
||||||
[
|
[
|
||||||
|
@ -147,9 +147,9 @@ desc("config") ->
|
||||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||||
["Configuration for InfluxDB using `", string:to_upper(Method), "` method."];
|
["Configuration for InfluxDB using `", string:to_upper(Method), "` method."];
|
||||||
desc(influxdb_api_v1) ->
|
desc(influxdb_api_v1) ->
|
||||||
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v1");
|
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1");
|
||||||
desc(influxdb_api_v2) ->
|
desc(influxdb_api_v2) ->
|
||||||
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v2");
|
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2");
|
||||||
desc(_) ->
|
desc(_) ->
|
||||||
undefined.
|
undefined.
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ee_connector_influxdb).
|
-module(emqx_bridge_influxdb_connector).
|
||||||
|
|
||||||
-include("emqx_ee_connector.hrl").
|
|
||||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
|
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
|
@ -40,6 +39,8 @@
|
||||||
|
|
||||||
-type ts_precision() :: ns | us | ms | s.
|
-type ts_precision() :: ns | us | ms | s.
|
||||||
|
|
||||||
|
-define(INFLUXDB_DEFAULT_PORT, 8086).
|
||||||
|
|
||||||
%% influxdb servers don't need parse
|
%% influxdb servers don't need parse
|
||||||
-define(INFLUXDB_HOST_OPTIONS, #{
|
-define(INFLUXDB_HOST_OPTIONS, #{
|
||||||
default_port => ?INFLUXDB_DEFAULT_PORT
|
default_port => ?INFLUXDB_DEFAULT_PORT
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ee_bridge_influxdb_SUITE).
|
-module(emqx_bridge_influxdb_SUITE).
|
||||||
|
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
@ -583,7 +583,7 @@ t_start_already_started(Config) ->
|
||||||
emqx_bridge_schema, InfluxDBConfigString
|
emqx_bridge_schema, InfluxDBConfigString
|
||||||
),
|
),
|
||||||
?check_trace(
|
?check_trace(
|
||||||
emqx_ee_connector_influxdb:on_start(ResourceId, InfluxDBConfigMap),
|
emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap),
|
||||||
fun(Result, Trace) ->
|
fun(Result, Trace) ->
|
||||||
?assertMatch({ok, _}, Result),
|
?assertMatch({ok, _}, Result),
|
||||||
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
|
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
|
||||||
|
@ -985,7 +985,7 @@ t_write_failure(Config) ->
|
||||||
?assertMatch([_ | _], Trace),
|
?assertMatch([_ | _], Trace),
|
||||||
[#{result := Result} | _] = Trace,
|
[#{result := Result} | _] = Trace,
|
||||||
?assert(
|
?assert(
|
||||||
not emqx_ee_connector_influxdb:is_unrecoverable_error(Result),
|
not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
|
||||||
#{got => Result}
|
#{got => Result}
|
||||||
);
|
);
|
||||||
async ->
|
async ->
|
||||||
|
@ -993,7 +993,7 @@ t_write_failure(Config) ->
|
||||||
?assertMatch([#{action := nack} | _], Trace),
|
?assertMatch([#{action := nack} | _], Trace),
|
||||||
[#{result := Result} | _] = Trace,
|
[#{result := Result} | _] = Trace,
|
||||||
?assert(
|
?assert(
|
||||||
not emqx_ee_connector_influxdb:is_unrecoverable_error(Result),
|
not emqx_bridge_influxdb_connector:is_unrecoverable_error(Result),
|
||||||
#{got => Result}
|
#{got => Result}
|
||||||
)
|
)
|
||||||
end,
|
end,
|
|
@ -2,16 +2,16 @@
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
|
|
||||||
-module(emqx_ee_connector_influxdb_SUITE).
|
-module(emqx_bridge_influxdb_connector_SUITE).
|
||||||
|
|
||||||
-compile(nowarn_export_all).
|
-compile(nowarn_export_all).
|
||||||
-compile(export_all).
|
-compile(export_all).
|
||||||
|
|
||||||
-include("emqx_connector.hrl").
|
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
-include_lib("common_test/include/ct.hrl").
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
|
||||||
-define(INFLUXDB_RESOURCE_MOD, emqx_ee_connector_influxdb).
|
-define(INFLUXDB_RESOURCE_MOD, emqx_bridge_influxdb_connector).
|
||||||
|
|
||||||
all() ->
|
all() ->
|
||||||
emqx_common_test_helpers:all(?MODULE).
|
emqx_common_test_helpers:all(?MODULE).
|
||||||
|
@ -65,7 +65,7 @@ t_lifecycle(Config) ->
|
||||||
Host = ?config(influxdb_tcp_host, Config),
|
Host = ?config(influxdb_tcp_host, Config),
|
||||||
Port = ?config(influxdb_tcp_port, Config),
|
Port = ?config(influxdb_tcp_port, Config),
|
||||||
perform_lifecycle_check(
|
perform_lifecycle_check(
|
||||||
<<"emqx_ee_connector_influxdb_SUITE">>,
|
<<"emqx_bridge_influxdb_connector_SUITE">>,
|
||||||
influxdb_config(Host, Port, false, <<"verify_none">>)
|
influxdb_config(Host, Port, false, <<"verify_none">>)
|
||||||
).
|
).
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||||
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
|
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
|
||||||
|
|
||||||
t_tls_verify_none(Config) ->
|
t_tls_verify_none(Config) ->
|
||||||
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
|
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
|
||||||
Host = ?config(influxdb_tls_host, Config),
|
Host = ?config(influxdb_tls_host, Config),
|
||||||
Port = ?config(influxdb_tls_port, Config),
|
Port = ?config(influxdb_tls_port, Config),
|
||||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
|
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
|
||||||
|
@ -135,7 +135,7 @@ t_tls_verify_none(Config) ->
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
t_tls_verify_peer(Config) ->
|
t_tls_verify_peer(Config) ->
|
||||||
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
|
PoolName = <<"emqx_bridge_influxdb_connector_SUITE">>,
|
||||||
Host = ?config(influxdb_tls_host, Config),
|
Host = ?config(influxdb_tls_host, Config),
|
||||||
Port = ?config(influxdb_tls_port, Config),
|
Port = ?config(influxdb_tls_port, Config),
|
||||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
|
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
|
|
@ -1,7 +1,7 @@
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
%%--------------------------------------------------------------------
|
%%--------------------------------------------------------------------
|
||||||
-module(emqx_ee_bridge_influxdb_tests).
|
-module(emqx_bridge_influxdb_tests).
|
||||||
|
|
||||||
-include_lib("eunit/include/eunit.hrl").
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
@ -192,7 +192,9 @@
|
||||||
fields => [{"field", "\"field\\4\""}],
|
fields => [{"field", "\"field\\4\""}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
|
{
|
||||||
|
"m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
|
||||||
|
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5}",
|
||||||
#{
|
#{
|
||||||
measurement => "m5,mA",
|
measurement => "m5,mA",
|
||||||
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
|
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
|
||||||
|
@ -200,7 +202,8 @@
|
||||||
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
|
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
|
||||||
],
|
],
|
||||||
timestamp => "${timestamp5}"
|
timestamp => "${timestamp5}"
|
||||||
}},
|
}
|
||||||
|
},
|
||||||
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
|
{"m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\"",
|
||||||
#{
|
#{
|
||||||
measurement => "m6",
|
measurement => "m6",
|
||||||
|
@ -208,20 +211,26 @@
|
||||||
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
|
fields => [{"field", "field6"}, {"field_a", "field6a"}, {"field_b", "field6b"}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\",field_a=field7a,field_b=\"field7b\\\\\n\"",
|
{
|
||||||
|
"\\ \\ m7\\ \\ ,tag=\\ tag\\,7\\ ,tag_a=\"tag7a\",tag_b\\,tag1=tag7b field=\"field7\","
|
||||||
|
"field_a=field7a,field_b=\"field7b\\\\\n\"",
|
||||||
#{
|
#{
|
||||||
measurement => " m7 ",
|
measurement => " m7 ",
|
||||||
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
|
tags => [{"tag", " tag,7 "}, {"tag_a", "\"tag7a\""}, {"tag_b,tag1", "tag7b"}],
|
||||||
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
|
fields => [{"field", "field7"}, {"field_a", "field7a"}, {"field_b", "field7b\\\n"}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}
|
||||||
{"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
|
},
|
||||||
|
{
|
||||||
|
"m8,tag=tag8,tag_a=\"tag8a\",tag_b=tag8b field=\"field8\",field_a=field8a,"
|
||||||
|
"field_b=\"\\\"field\\\" = 8b\" ${timestamp8}",
|
||||||
#{
|
#{
|
||||||
measurement => "m8",
|
measurement => "m8",
|
||||||
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
tags => [{"tag", "tag8"}, {"tag_a", "\"tag8a\""}, {"tag_b", "tag8b"}],
|
||||||
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
|
fields => [{"field", "field8"}, {"field_a", "field8a"}, {"field_b", "\"field\" = 8b"}],
|
||||||
timestamp => "${timestamp8}"
|
timestamp => "${timestamp8}"
|
||||||
}},
|
}
|
||||||
|
},
|
||||||
{"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
|
{"m\\9,tag=tag9,tag_a=\"tag9a\",tag_b=tag9b field\\=field=\"field9\",field_a=field9a,field_b=\"\" ${timestamp9}",
|
||||||
#{
|
#{
|
||||||
measurement => "m\\9",
|
measurement => "m\\9",
|
||||||
|
@ -263,7 +272,9 @@
|
||||||
fields => [{"field", "\"field\\4\""}],
|
fields => [{"field", "\"field\\4\""}],
|
||||||
timestamp => undefined
|
timestamp => undefined
|
||||||
}},
|
}},
|
||||||
{" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
|
{
|
||||||
|
" m5\\,mA,tag=\\=tag5\\=,\\,tag_a\\,=tag\\ 5a,tag_b=tag5b \\ field\\ =field5,"
|
||||||
|
"field\\ _\\ a=field5a,\\,field_b\\ =\\=\\,\\ field5b ${timestamp5} ",
|
||||||
#{
|
#{
|
||||||
measurement => "m5,mA",
|
measurement => "m5,mA",
|
||||||
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
|
tags => [{"tag", "=tag5="}, {",tag_a,", "tag 5a"}, {"tag_b", "tag5b"}],
|
||||||
|
@ -271,7 +282,8 @@
|
||||||
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
|
{" field ", "field5"}, {"field _ a", "field5a"}, {",field_b ", "=, field5b"}
|
||||||
],
|
],
|
||||||
timestamp => "${timestamp5}"
|
timestamp => "${timestamp5}"
|
||||||
}},
|
}
|
||||||
|
},
|
||||||
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ",
|
{" m6,tag=tag6,tag_a=tag6a,tag_b=tag6b field=\"field6\",field_a=\"field6a\",field_b=\"field6b\" ",
|
||||||
#{
|
#{
|
||||||
measurement => "m6",
|
measurement => "m6",
|
||||||
|
@ -330,7 +342,7 @@ to_influx_lines(RawLines) ->
|
||||||
try
|
try
|
||||||
%% mute error logs from this call
|
%% mute error logs from this call
|
||||||
emqx_logger:set_primary_log_level(none),
|
emqx_logger:set_primary_log_level(none),
|
||||||
emqx_ee_bridge_influxdb:to_influx_lines(RawLines)
|
emqx_bridge_influxdb:to_influx_lines(RawLines)
|
||||||
after
|
after
|
||||||
emqx_logger:set_primary_log_level(OldLevel)
|
emqx_logger:set_primary_log_level(OldLevel)
|
||||||
end.
|
end.
|
|
@ -583,7 +583,7 @@ config(Args0, More) ->
|
||||||
ct:pal("Running tests with conf:\n~p", [Conf]),
|
ct:pal("Running tests with conf:\n~p", [Conf]),
|
||||||
InstId = maps:get("instance_id", Args),
|
InstId = maps:get("instance_id", Args),
|
||||||
<<"bridge:", BridgeId/binary>> = InstId,
|
<<"bridge:", BridgeId/binary>> = InstId,
|
||||||
{Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
{Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId, #{atom_name => false}),
|
||||||
TypeBin = atom_to_binary(Type),
|
TypeBin = atom_to_binary(Type),
|
||||||
hocon_tconf:check_plain(
|
hocon_tconf:check_plain(
|
||||||
emqx_bridge_schema,
|
emqx_bridge_schema,
|
||||||
|
@ -596,7 +596,7 @@ config(Args0, More) ->
|
||||||
hocon_config(Args) ->
|
hocon_config(Args) ->
|
||||||
InstId = maps:get("instance_id", Args),
|
InstId = maps:get("instance_id", Args),
|
||||||
<<"bridge:", BridgeId/binary>> = InstId,
|
<<"bridge:", BridgeId/binary>> = InstId,
|
||||||
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId),
|
{_Type, Name} = emqx_bridge_resource:parse_bridge_id(BridgeId, #{atom_name => false}),
|
||||||
AuthConf = maps:get("authentication", Args),
|
AuthConf = maps:get("authentication", Args),
|
||||||
AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
|
AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)),
|
||||||
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
|
AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf),
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_pulsar, [
|
{application, emqx_bridge_pulsar, [
|
||||||
{description, "EMQX Pulsar Bridge"},
|
{description, "EMQX Pulsar Bridge"},
|
||||||
{vsn, "0.1.1"},
|
{vsn, "0.1.2"},
|
||||||
{registered, []},
|
{registered, []},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -140,7 +140,7 @@ fields(producer_resource_opts) ->
|
||||||
lists:filtermap(
|
lists:filtermap(
|
||||||
fun
|
fun
|
||||||
({health_check_interval = Field, MetaFn}) ->
|
({health_check_interval = Field, MetaFn}) ->
|
||||||
{true, {Field, override_default(MetaFn, 1_000)}};
|
{true, {Field, override_default(MetaFn, <<"1s">>)}};
|
||||||
({Field, _Meta}) ->
|
({Field, _Meta}) ->
|
||||||
lists:member(Field, SupportedOpts)
|
lists:member(Field, SupportedOpts)
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -116,6 +116,87 @@ authn_validations_test() ->
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
%% erlfmt-ignore
|
||||||
|
-define(LISTENERS,
|
||||||
|
"""
|
||||||
|
listeners.ssl.default.bind = 9999
|
||||||
|
listeners.wss.default.bind = 9998
|
||||||
|
listeners.wss.default.ssl_options.cacertfile = \"mytest/certs/cacert.pem\"
|
||||||
|
listeners.wss.new.bind = 9997
|
||||||
|
listeners.wss.new.websocket.mqtt_path = \"/my-mqtt\"
|
||||||
|
"""
|
||||||
|
).
|
||||||
|
|
||||||
|
listeners_test() ->
|
||||||
|
BaseConf = to_bin(?BASE_CONF, ["emqx1@127.0.0.1", "emqx1@127.0.0.1"]),
|
||||||
|
|
||||||
|
Conf = <<BaseConf/binary, ?LISTENERS>>,
|
||||||
|
{ok, ConfMap0} = hocon:binary(Conf, #{format => richmap}),
|
||||||
|
{_, ConfMap} = hocon_tconf:map_translate(emqx_conf_schema, ConfMap0, #{format => richmap}),
|
||||||
|
#{<<"listeners">> := Listeners} = hocon_util:richmap_to_map(ConfMap),
|
||||||
|
#{
|
||||||
|
<<"tcp">> := #{<<"default">> := Tcp},
|
||||||
|
<<"ws">> := #{<<"default">> := Ws},
|
||||||
|
<<"wss">> := #{<<"default">> := DefaultWss, <<"new">> := NewWss},
|
||||||
|
<<"ssl">> := #{<<"default">> := Ssl}
|
||||||
|
} = Listeners,
|
||||||
|
DefaultCacertFile = <<"${EMQX_ETC_DIR}/certs/cacert.pem">>,
|
||||||
|
DefaultCertFile = <<"${EMQX_ETC_DIR}/certs/cert.pem">>,
|
||||||
|
DefaultKeyFile = <<"${EMQX_ETC_DIR}/certs/key.pem">>,
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"bind">> := {{0, 0, 0, 0}, 1883},
|
||||||
|
<<"enabled">> := true
|
||||||
|
},
|
||||||
|
Tcp
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"bind">> := {{0, 0, 0, 0}, 8083},
|
||||||
|
<<"enabled">> := true,
|
||||||
|
<<"websocket">> := #{<<"mqtt_path">> := "/mqtt"}
|
||||||
|
},
|
||||||
|
Ws
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"bind">> := 9999,
|
||||||
|
<<"ssl_options">> := #{
|
||||||
|
<<"cacertfile">> := DefaultCacertFile,
|
||||||
|
<<"certfile">> := DefaultCertFile,
|
||||||
|
<<"keyfile">> := DefaultKeyFile
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Ssl
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"bind">> := 9998,
|
||||||
|
<<"websocket">> := #{<<"mqtt_path">> := "/mqtt"},
|
||||||
|
<<"ssl_options">> :=
|
||||||
|
#{
|
||||||
|
<<"cacertfile">> := <<"mytest/certs/cacert.pem">>,
|
||||||
|
<<"certfile">> := DefaultCertFile,
|
||||||
|
<<"keyfile">> := DefaultKeyFile
|
||||||
|
}
|
||||||
|
},
|
||||||
|
DefaultWss
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"bind">> := 9997,
|
||||||
|
<<"websocket">> := #{<<"mqtt_path">> := "/my-mqtt"},
|
||||||
|
<<"ssl_options">> :=
|
||||||
|
#{
|
||||||
|
<<"cacertfile">> := DefaultCacertFile,
|
||||||
|
<<"certfile">> := DefaultCertFile,
|
||||||
|
<<"keyfile">> := DefaultKeyFile
|
||||||
|
}
|
||||||
|
},
|
||||||
|
NewWss
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
doc_gen_test() ->
|
doc_gen_test() ->
|
||||||
%% the json file too large to encode.
|
%% the json file too large to encode.
|
||||||
{
|
{
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
{application, emqx_rule_engine, [
|
{application, emqx_rule_engine, [
|
||||||
{description, "EMQX Rule Engine"},
|
{description, "EMQX Rule Engine"},
|
||||||
% strict semver, bump manually!
|
% strict semver, bump manually!
|
||||||
{vsn, "5.0.15"},
|
{vsn, "5.0.16"},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
{registered, [emqx_rule_engine_sup, emqx_rule_engine]},
|
||||||
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},
|
{applications, [kernel, stdlib, rulesql, getopt, emqx_ctl]},
|
||||||
|
|
|
@ -341,7 +341,10 @@ get_basic_usage_info() ->
|
||||||
tally_referenced_bridges(BridgeIDs, Acc0) ->
|
tally_referenced_bridges(BridgeIDs, Acc0) ->
|
||||||
lists:foldl(
|
lists:foldl(
|
||||||
fun(BridgeID, Acc) ->
|
fun(BridgeID, Acc) ->
|
||||||
{BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(BridgeID),
|
{BridgeType, _BridgeName} = emqx_bridge_resource:parse_bridge_id(
|
||||||
|
BridgeID,
|
||||||
|
#{atom_name => false}
|
||||||
|
),
|
||||||
maps:update_with(
|
maps:update_with(
|
||||||
BridgeType,
|
BridgeType,
|
||||||
fun(X) -> X + 1 end,
|
fun(X) -> X + 1 end,
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Fix the issue where emqx cannot start when `sysmon.os.mem_check_interval` is disabled.
|
|
@ -0,0 +1 @@
|
||||||
|
Enhanced clarity of the descriptions for the bridge configuration fields (username and password) to better guide users during setup.
|
|
@ -0,0 +1 @@
|
||||||
|
Refactor the directory structure of the InfluxDB data bridge.
|
|
@ -0,0 +1,2 @@
|
||||||
|
Fix the issue where the lack of a default value for ssl_options in listeners results in startup failure.
|
||||||
|
For example, such command(`EMQX_LISTENERS__WSS__DEFAULT__BIND='0.0.0.0:8089' ./bin/emqx console`) would have caused a crash before.
|
|
@ -1,5 +1,4 @@
|
||||||
toxiproxy
|
toxiproxy
|
||||||
influxdb
|
|
||||||
mongo
|
mongo
|
||||||
mongo_rs_sharded
|
mongo_rs_sharded
|
||||||
mysql
|
mysql
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
emqx_bridge_sqlserver,
|
emqx_bridge_sqlserver,
|
||||||
emqx_bridge_rocketmq,
|
emqx_bridge_rocketmq,
|
||||||
emqx_bridge_rabbitmq,
|
emqx_bridge_rabbitmq,
|
||||||
emqx_bridge_tdengine
|
emqx_bridge_tdengine,
|
||||||
|
emqx_bridge_influxdb
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
{modules, []},
|
{modules, []},
|
||||||
|
|
|
@ -24,8 +24,8 @@ api_schemas(Method) ->
|
||||||
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
|
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
|
||||||
ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
|
ref(emqx_ee_bridge_mongodb, Method ++ "_single"),
|
||||||
ref(emqx_ee_bridge_hstreamdb, Method),
|
ref(emqx_ee_bridge_hstreamdb, Method),
|
||||||
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"),
|
ref(emqx_bridge_influxdb, Method ++ "_api_v1"),
|
||||||
ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2"),
|
ref(emqx_bridge_influxdb, Method ++ "_api_v2"),
|
||||||
ref(emqx_ee_bridge_redis, Method ++ "_single"),
|
ref(emqx_ee_bridge_redis, Method ++ "_single"),
|
||||||
ref(emqx_ee_bridge_redis, Method ++ "_sentinel"),
|
ref(emqx_ee_bridge_redis, Method ++ "_sentinel"),
|
||||||
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
|
ref(emqx_ee_bridge_redis, Method ++ "_cluster"),
|
||||||
|
@ -49,7 +49,7 @@ schema_modules() ->
|
||||||
emqx_bridge_cassandra,
|
emqx_bridge_cassandra,
|
||||||
emqx_ee_bridge_hstreamdb,
|
emqx_ee_bridge_hstreamdb,
|
||||||
emqx_bridge_gcp_pubsub,
|
emqx_bridge_gcp_pubsub,
|
||||||
emqx_ee_bridge_influxdb,
|
emqx_bridge_influxdb,
|
||||||
emqx_ee_bridge_mongodb,
|
emqx_ee_bridge_mongodb,
|
||||||
emqx_ee_bridge_mysql,
|
emqx_ee_bridge_mysql,
|
||||||
emqx_ee_bridge_redis,
|
emqx_ee_bridge_redis,
|
||||||
|
@ -92,8 +92,8 @@ resource_type(mongodb_rs) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
|
resource_type(mongodb_sharded) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
|
resource_type(mongodb_single) -> emqx_ee_connector_mongodb;
|
||||||
resource_type(mysql) -> emqx_connector_mysql;
|
resource_type(mysql) -> emqx_connector_mysql;
|
||||||
resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb;
|
resource_type(influxdb_api_v1) -> emqx_bridge_influxdb_connector;
|
||||||
resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb;
|
resource_type(influxdb_api_v2) -> emqx_bridge_influxdb_connector;
|
||||||
resource_type(redis_single) -> emqx_ee_connector_redis;
|
resource_type(redis_single) -> emqx_ee_connector_redis;
|
||||||
resource_type(redis_sentinel) -> emqx_ee_connector_redis;
|
resource_type(redis_sentinel) -> emqx_ee_connector_redis;
|
||||||
resource_type(redis_cluster) -> emqx_ee_connector_redis;
|
resource_type(redis_cluster) -> emqx_ee_connector_redis;
|
||||||
|
@ -247,7 +247,7 @@ influxdb_structs() ->
|
||||||
[
|
[
|
||||||
{Protocol,
|
{Protocol,
|
||||||
mk(
|
mk(
|
||||||
hoconsc:map(name, ref(emqx_ee_bridge_influxdb, Protocol)),
|
hoconsc:map(name, ref(emqx_bridge_influxdb, Protocol)),
|
||||||
#{
|
#{
|
||||||
desc => <<"InfluxDB Bridge Config">>,
|
desc => <<"InfluxDB Bridge Config">>,
|
||||||
required => false
|
required => false
|
||||||
|
|
|
@ -1,5 +0,0 @@
|
||||||
%%--------------------------------------------------------------------
|
|
||||||
%% Copyright (c) 2022-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
||||||
%%-------------------------------------------------------------------
|
|
||||||
|
|
||||||
-define(INFLUXDB_DEFAULT_PORT, 8086).
|
|
|
@ -2,7 +2,6 @@
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
|
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
|
||||||
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
|
|
||||||
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}},
|
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}},
|
||||||
{emqx, {path, "../../apps/emqx"}},
|
{emqx, {path, "../../apps/emqx"}},
|
||||||
{emqx_utils, {path, "../../apps/emqx_utils"}}
|
{emqx_utils, {path, "../../apps/emqx_utils"}}
|
||||||
|
|
|
@ -7,7 +7,6 @@
|
||||||
stdlib,
|
stdlib,
|
||||||
ecpool,
|
ecpool,
|
||||||
hstreamdb_erl,
|
hstreamdb_erl,
|
||||||
influxdb,
|
|
||||||
clickhouse
|
clickhouse
|
||||||
]},
|
]},
|
||||||
{env, []},
|
{env, []},
|
||||||
|
|
2
mix.exs
2
mix.exs
|
@ -65,7 +65,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
# maybe forbid to fetch quicer
|
# maybe forbid to fetch quicer
|
||||||
{:emqtt,
|
{:emqtt,
|
||||||
github: "emqx/emqtt", tag: "1.8.5", override: true, system_env: maybe_no_quic_env()},
|
github: "emqx/emqtt", tag: "1.8.5", override: true, system_env: maybe_no_quic_env()},
|
||||||
{:rulesql, github: "emqx/rulesql", tag: "0.1.5"},
|
{:rulesql, github: "emqx/rulesql", tag: "0.1.6"},
|
||||||
{:observer_cli, "1.7.1"},
|
{:observer_cli, "1.7.1"},
|
||||||
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
|
{:system_monitor, github: "ieQu1/system_monitor", tag: "3.0.3"},
|
||||||
{:telemetry, "1.1.0"},
|
{:telemetry, "1.1.0"},
|
||||||
|
|
|
@ -70,7 +70,7 @@
|
||||||
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
|
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.7"}}}
|
||||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}
|
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.8.5"}}}
|
||||||
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.5"}}}
|
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.6"}}}
|
||||||
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
|
, {observer_cli, "1.7.1"} % NOTE: depends on recon 2.5.x
|
||||||
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
, {system_monitor, {git, "https://github.com/ieQu1/system_monitor", {tag, "3.0.3"}}}
|
||||||
, {getopt, "1.0.2"}
|
, {getopt, "1.0.2"}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
emqx_ee_bridge_influxdb {
|
emqx_bridge_influxdb {
|
||||||
|
|
||||||
config_enable.desc:
|
config_enable.desc:
|
||||||
"""Enable or disable this bridge."""
|
"""Enable or disable this bridge."""
|
|
@ -1,4 +1,4 @@
|
||||||
emqx_ee_connector_influxdb {
|
emqx_bridge_influxdb_connector {
|
||||||
|
|
||||||
bucket.desc:
|
bucket.desc:
|
||||||
"""InfluxDB bucket name."""
|
"""InfluxDB bucket name."""
|
|
@ -13,7 +13,7 @@ database_desc.label:
|
||||||
"""Database Name"""
|
"""Database Name"""
|
||||||
|
|
||||||
password.desc:
|
password.desc:
|
||||||
"""EMQX's password in the external database."""
|
"""The password associated with the bridge, used for authentication with the external database."""
|
||||||
|
|
||||||
password.label:
|
password.label:
|
||||||
"""Password"""
|
"""Password"""
|
||||||
|
@ -37,7 +37,7 @@ ssl.label:
|
||||||
"""Enable SSL"""
|
"""Enable SSL"""
|
||||||
|
|
||||||
username.desc:
|
username.desc:
|
||||||
"""EMQX's username in the external database."""
|
"""The username associated with the bridge in the external database used for authentication or identification purposes."""
|
||||||
|
|
||||||
username.label:
|
username.label:
|
||||||
"""Username"""
|
"""Username"""
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
emqx_ee_bridge_influxdb {
|
emqx_bridge_influxdb {
|
||||||
|
|
||||||
config_enable.desc:
|
config_enable.desc:
|
||||||
"""启用/禁用桥接。"""
|
"""启用/禁用桥接。"""
|
|
@ -1,4 +1,4 @@
|
||||||
emqx_ee_connector_influxdb {
|
emqx_bridge_influxdb_connector {
|
||||||
|
|
||||||
bucket.desc:
|
bucket.desc:
|
||||||
"""InfluxDB bucket 名称。"""
|
"""InfluxDB bucket 名称。"""
|
Loading…
Reference in New Issue