Merge remote-tracking branch 'upstream/master' into refactor-gw-dir

This commit is contained in:
JianBo He 2023-04-06 10:44:59 +08:00
commit 702ecbcb6e
169 changed files with 1131 additions and 693 deletions

View File

@ -469,8 +469,8 @@ concurrent_materialized_view_writes: 32
# accepting writes when the limit is exceeded until a flush completes, # accepting writes when the limit is exceeded until a flush completes,
# and will trigger a flush based on memtable_cleanup_threshold # and will trigger a flush based on memtable_cleanup_threshold
# If omitted, Cassandra will set both to 1/4 the size of the heap. # If omitted, Cassandra will set both to 1/4 the size of the heap.
# memtable_heap_space_in_mb: 2048 memtable_heap_space_in_mb: 2048
# memtable_offheap_space_in_mb: 2048 memtable_offheap_space_in_mb: 2048
# memtable_cleanup_threshold is deprecated. The default calculation # memtable_cleanup_threshold is deprecated. The default calculation
# is the only reasonable choice. See the comments on memtable_flush_writers # is the only reasonable choice. See the comments on memtable_flush_writers

View File

@ -12,6 +12,8 @@ services:
environment: environment:
CASSANDRA_BROADCAST_ADDRESS: "1.2.3.4" CASSANDRA_BROADCAST_ADDRESS: "1.2.3.4"
CASSANDRA_RPC_ADDRESS: "0.0.0.0" CASSANDRA_RPC_ADDRESS: "0.0.0.0"
HEAP_NEWSIZE: "128M"
MAX_HEAP_SIZE: "2048M"
volumes: volumes:
- ./certs:/certs - ./certs:/certs
#ports: #ports:

View File

@ -82,7 +82,7 @@ ct: $(REBAR) merge-config
static_checks: static_checks:
@$(REBAR) as check do xref, dialyzer @$(REBAR) as check do xref, dialyzer
@if [ "$${PROFILE}" = 'emqx-enterprise' ]; then $(REBAR) ct --suite apps/emqx/test/emqx_static_checks --readable $(CT_READABLE); fi @if [ "$${PROFILE}" = 'emqx-enterprise' ]; then $(REBAR) ct --suite apps/emqx/test/emqx_static_checks --readable $(CT_READABLE); fi
@if [ "$${PROFILE}" = 'emqx-enterprise' ]; then ./scripts/check-i18n-style.sh; fi ./scripts/check-i18n-style.sh
APPS=$(shell $(SCRIPTS)/find-apps.sh) APPS=$(shell $(SCRIPTS)/find-apps.sh)

View File

@ -35,7 +35,7 @@
-define(EMQX_RELEASE_CE, "5.0.21"). -define(EMQX_RELEASE_CE, "5.0.21").
%% Enterprise edition %% Enterprise edition
-define(EMQX_RELEASE_EE, "5.0.2-rc.2"). -define(EMQX_RELEASE_EE, "5.0.2-rc.4").
%% the HTTP API version %% the HTTP API version
-define(EMQX_API_VERSION, "5.0"). -define(EMQX_API_VERSION, "5.0").

View File

@ -1880,7 +1880,9 @@ mqtt_listener(Bind) ->
default => <<"3s">> default => <<"3s">>
} }
)}, )},
{?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, authentication(listener)} {?EMQX_AUTHENTICATION_CONFIG_ROOT_NAME, (authentication(listener))#{
importance => ?IMPORTANCE_HIDDEN
}}
]. ].
base_listener(Bind) -> base_listener(Bind) ->

View File

@ -154,7 +154,12 @@ t_session_taken(_) ->
{clean_start, false}, {clean_start, false},
{properties, #{'Session-Expiry-Interval' => 120}} {properties, #{'Session-Expiry-Interval' => 120}}
]), ]),
{ok, _} = emqtt:connect(C), case emqtt:connect(C) of
{ok, _} ->
ok;
{error, econnrefused} ->
throw(mqtt_listener_not_ready)
end,
{ok, _, [0]} = emqtt:subscribe(C, Topic, []), {ok, _, [0]} = emqtt:subscribe(C, Topic, []),
C C
end, end,
@ -168,9 +173,21 @@ t_session_taken(_) ->
lists:seq(1, MsgNum) lists:seq(1, MsgNum)
) )
end, end,
emqx_common_test_helpers:wait_for(
C1 = Connect(), ?FUNCTION_NAME,
ok = emqtt:disconnect(C1), ?LINE,
fun() ->
try
C = Connect(),
emqtt:disconnect(C),
true
catch
throw:mqtt_listener_not_ready ->
false
end
end,
3000
),
Publish(), Publish(),

View File

@ -660,6 +660,7 @@ start_slave(Name, Opts) when is_list(Opts) ->
start_slave(Name, Opts) when is_map(Opts) -> start_slave(Name, Opts) when is_map(Opts) ->
SlaveMod = maps:get(peer_mod, Opts, ct_slave), SlaveMod = maps:get(peer_mod, Opts, ct_slave),
Node = node_name(Name), Node = node_name(Name),
put_peer_mod(Node, SlaveMod),
DoStart = DoStart =
fun() -> fun() ->
case SlaveMod of case SlaveMod of
@ -669,8 +670,8 @@ start_slave(Name, Opts) when is_map(Opts) ->
[ [
{kill_if_fail, true}, {kill_if_fail, true},
{monitor_master, true}, {monitor_master, true},
{init_timeout, 10000}, {init_timeout, 20_000},
{startup_timeout, 10000}, {startup_timeout, 20_000},
{erl_flags, erl_flags()} {erl_flags, erl_flags()}
] ]
); );
@ -687,7 +688,6 @@ start_slave(Name, Opts) when is_map(Opts) ->
throw(Other) throw(Other)
end, end,
pong = net_adm:ping(Node), pong = net_adm:ping(Node),
put_peer_mod(Node, SlaveMod),
setup_node(Node, Opts), setup_node(Node, Opts),
ok = snabbkaffe:forward_trace(Node), ok = snabbkaffe:forward_trace(Node),
Node. Node.

View File

@ -884,7 +884,20 @@ t_revoked(Config) ->
{port, 8883} {port, 8883}
]), ]),
process_flag(trap_exit, true), process_flag(trap_exit, true),
?assertMatch({error, {{shutdown, {tls_alert, {certificate_revoked, _}}}, _}}, emqtt:connect(C)), Res = emqtt:connect(C),
%% apparently, sometimes there's some race condition in
%% `emqtt_sock:ssl_upgrade' when it calls
%% `ssl:conetrolling_process' and a bad match happens at that
%% point.
case Res of
{error, {{shutdown, {tls_alert, {certificate_revoked, _}}}, _}} ->
ok;
{error, closed} ->
%% race condition?
ok;
_ ->
ct:fail("unexpected result: ~p", [Res])
end,
ok. ok.
t_revoke_then_refresh(Config) -> t_revoke_then_refresh(Config) ->

View File

@ -65,7 +65,7 @@ terminate(_Reason, #{callbacks := Callbacks}) ->
handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) -> handle_call({push, Callback}, _From, State = #{callbacks := Callbacks}) ->
{reply, ok, State#{callbacks := [Callback | Callbacks]}}; {reply, ok, State#{callbacks := [Callback | Callbacks]}};
handle_call(terminate, _From, State = #{callbacks := Callbacks}) -> handle_call(terminate, _From, State = #{callbacks := Callbacks}) ->
lists:foreach(fun(Fun) -> Fun() end, Callbacks), lists:foreach(fun(Fun) -> catch Fun() end, Callbacks),
{stop, normal, ok, State}; {stop, normal, ok, State};
handle_call(_Req, _From, State) -> handle_call(_Req, _From, State) ->
{reply, error, State}. {reply, error, State}.

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authn, [ {application, emqx_authn, [
{description, "EMQX Authentication"}, {description, "EMQX Authentication"},
{vsn, "0.1.15"}, {vsn, "0.1.16"},
{modules, []}, {modules, []},
{registered, [emqx_authn_sup, emqx_authn_registry]}, {registered, [emqx_authn_sup, emqx_authn_registry]},
{applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]}, {applications, [kernel, stdlib, emqx_resource, emqx_connector, ehttpc, epgsql, mysql, jose]},

View File

@ -1419,14 +1419,14 @@ request_user_create_examples() ->
summary => <<"Regular user">>, summary => <<"Regular user">>,
value => #{ value => #{
user_id => <<"user1">>, user_id => <<"user1">>,
password => <<"secret">> password => <<"******">>
} }
}, },
super_user => #{ super_user => #{
summary => <<"Superuser">>, summary => <<"Superuser">>,
value => #{ value => #{
user_id => <<"user2">>, user_id => <<"user2">>,
password => <<"secret">>, password => <<"******">>,
is_superuser => true is_superuser => true
} }
} }
@ -1437,13 +1437,13 @@ request_user_update_examples() ->
regular_user => #{ regular_user => #{
summary => <<"Update regular user">>, summary => <<"Update regular user">>,
value => #{ value => #{
password => <<"newsecret">> password => <<"******">>
} }
}, },
super_user => #{ super_user => #{
summary => <<"Update user and promote to superuser">>, summary => <<"Update user and promote to superuser">>,
value => #{ value => #{
password => <<"newsecret">>, password => <<"******">>,
is_superuser => true is_superuser => true
} }
} }

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_authz, [ {application, emqx_authz, [
{description, "An OTP application"}, {description, "An OTP application"},
{vsn, "0.1.15"}, {vsn, "0.1.16"},
{registered, []}, {registered, []},
{mod, {emqx_authz_app, []}}, {mod, {emqx_authz_app, []}},
{applications, [ {applications, [

View File

@ -492,7 +492,9 @@ authz_fields() ->
?ARRAY(?UNION(UnionMemberSelector)), ?ARRAY(?UNION(UnionMemberSelector)),
#{ #{
default => [], default => [],
desc => ?DESC(sources) desc => ?DESC(sources),
%% doc_lift is force a root level reference instead of nesting sub-structs
extra => #{doc_lift => true}
} }
)} )}
]. ].

View File

@ -235,7 +235,7 @@ mqtt_main_example() ->
server => <<"127.0.0.1:1883">>, server => <<"127.0.0.1:1883">>,
proto_ver => <<"v4">>, proto_ver => <<"v4">>,
username => <<"foo">>, username => <<"foo">>,
password => <<"bar">>, password => <<"******">>,
clean_start => true, clean_start => true,
keepalive => <<"300s">>, keepalive => <<"300s">>,
retry_interval => <<"15s">>, retry_interval => <<"15s">>,

View File

@ -270,9 +270,6 @@ fast_forward_to_commit(Node, ToTnxId) ->
%% @private %% @private
init([Node, RetryMs]) -> init([Node, RetryMs]) ->
%% Workaround for https://github.com/emqx/mria/issues/94:
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
{ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}), {ok, _} = mnesia:subscribe({table, ?CLUSTER_MFA, simple}),
State = #{node => Node, retry_interval => RetryMs}, State = #{node => Node, retry_interval => RetryMs},
%% The init transaction ID is set in emqx_conf_app after %% The init transaction ID is set in emqx_conf_app after
@ -286,6 +283,9 @@ init([Node, RetryMs]) ->
%% @private %% @private
handle_continue(?CATCH_UP, State) -> handle_continue(?CATCH_UP, State) ->
%% emqx app must be started before
%% trying to catch up the rpc commit logs
ok = wait_for_emqx_ready(),
{noreply, State, catch_up(State)}. {noreply, State, catch_up(State)}.
handle_call(reset, _From, State) -> handle_call(reset, _From, State) ->
@ -572,3 +572,37 @@ maybe_init_tnx_id(_Node, TnxId) when TnxId < 0 -> ok;
maybe_init_tnx_id(Node, TnxId) -> maybe_init_tnx_id(Node, TnxId) ->
{atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]), {atomic, _} = transaction(fun ?MODULE:commit/2, [Node, TnxId]),
ok. ok.
%% @priv Cannot proceed until emqx app is ready.
%% Otherwise the committed transaction catch up may fail.
wait_for_emqx_ready() ->
%% wait 10 seconds for emqx to start
ok = do_wait_for_emqx_ready(10).
%% Wait for emqx app to be ready,
%% write a log message every 1 second
do_wait_for_emqx_ready(0) ->
timeout;
do_wait_for_emqx_ready(N) ->
%% check interval is 100ms
%% makes the total wait time 1 second
case do_wait_for_emqx_ready2(10) of
ok ->
ok;
timeout ->
?SLOG(warning, #{msg => "stil_waiting_for_emqx_app_to_be_ready"}),
do_wait_for_emqx_ready(N - 1)
end.
%% Wait for emqx app to be ready,
%% check interval is 100ms
do_wait_for_emqx_ready2(0) ->
timeout;
do_wait_for_emqx_ready2(N) ->
case emqx:is_running() of
true ->
ok;
false ->
timer:sleep(100),
do_wait_for_emqx_ready2(N - 1)
end.

View File

@ -13,7 +13,9 @@
%% See the License for the specific language governing permissions and %% See the License for the specific language governing permissions and
%% limitations under the License. %% limitations under the License.
%%-------------------------------------------------------------------- %%--------------------------------------------------------------------
-module(emqx_cluster_rpc_handler).
%% @doc This module is responsible for cleaning up the cluster RPC MFA.
-module(emqx_cluster_rpc_cleaner).
-behaviour(gen_server). -behaviour(gen_server).

View File

@ -95,19 +95,22 @@ init_load() ->
-endif. -endif.
init_conf() -> init_conf() ->
%% Workaround for https://github.com/emqx/mria/issues/94:
_ = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], 1000),
_ = mria:wait_for_tables([?CLUSTER_MFA, ?CLUSTER_COMMIT]),
{ok, TnxId} = copy_override_conf_from_core_node(), {ok, TnxId} = copy_override_conf_from_core_node(),
_ = emqx_app:set_init_tnx_id(TnxId), _ = emqx_app:set_init_tnx_id(TnxId),
ok = init_load(), ok = init_load(),
ok = emqx_app:set_init_config_load_done(). ok = emqx_app:set_init_config_load_done().
cluster_nodes() -> cluster_nodes() ->
maps:get(running_nodes, ekka_cluster:info()) -- [node()]. mria:cluster_nodes(cores) -- [node()].
copy_override_conf_from_core_node() -> copy_override_conf_from_core_node() ->
case cluster_nodes() of case cluster_nodes() of
%% The first core nodes is self. %% The first core nodes is self.
[] -> [] ->
?SLOG(debug, #{msg => "skip_copy_overide_conf_from_core_node"}), ?SLOG(debug, #{msg => "skip_copy_override_conf_from_core_node"}),
{ok, ?DEFAULT_INIT_TXN_ID}; {ok, ?DEFAULT_INIT_TXN_ID};
Nodes -> Nodes ->
{Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes), {Results, Failed} = emqx_conf_proto_v2:get_override_config_file(Nodes),
@ -141,7 +144,7 @@ copy_override_conf_from_core_node() ->
%% finish the boot sequence and load the %% finish the boot sequence and load the
%% config for other nodes to copy it. %% config for other nodes to copy it.
?SLOG(info, #{ ?SLOG(info, #{
msg => "skip_copy_overide_conf_from_core_node", msg => "skip_copy_override_conf_from_core_node",
loading_from_disk => true, loading_from_disk => true,
nodes => Nodes, nodes => Nodes,
failed => Failed, failed => Failed,
@ -153,7 +156,7 @@ copy_override_conf_from_core_node() ->
Jitter = rand:uniform(2_000), Jitter = rand:uniform(2_000),
Timeout = 10_000 + Jitter, Timeout = 10_000 + Jitter,
?SLOG(info, #{ ?SLOG(info, #{
msg => "copy_overide_conf_from_core_node_retry", msg => "copy_override_conf_from_core_node_retry",
timeout => Timeout, timeout => Timeout,
nodes => Nodes, nodes => Nodes,
failed => Failed, failed => Failed,
@ -166,7 +169,7 @@ copy_override_conf_from_core_node() ->
[{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready), [{ok, Info} | _] = lists:sort(fun conf_sort/2, Ready),
#{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info, #{node := Node, conf := RawOverrideConf, tnx_id := TnxId} = Info,
?SLOG(debug, #{ ?SLOG(debug, #{
msg => "copy_overide_conf_from_core_node_success", msg => "copy_override_conf_from_core_node_success",
node => Node, node => Node,
cluster_override_conf_file => application:get_env( cluster_override_conf_file => application:get_env(
emqx, cluster_override_conf_file emqx, cluster_override_conf_file

View File

@ -36,7 +36,7 @@ init([]) ->
ChildSpecs = ChildSpecs =
[ [
child_spec(emqx_cluster_rpc, []), child_spec(emqx_cluster_rpc, []),
child_spec(emqx_cluster_rpc_handler, []) child_spec(emqx_cluster_rpc_cleaner, [])
], ],
{ok, {SupFlags, ChildSpecs}}. {ok, {SupFlags, ChildSpecs}}.

View File

@ -43,6 +43,7 @@ groups() -> [].
init_per_suite(Config) -> init_per_suite(Config) ->
application:load(emqx_conf), application:load(emqx_conf),
ok = ekka:start(), ok = ekka:start(),
ok = emqx_common_test_helpers:start_apps([]),
ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity), ok = mria_rlog:wait_for_shards([?CLUSTER_RPC_SHARD], infinity),
ok = emqx_config:put([node, cluster_call, retry_interval], 1000), ok = emqx_config:put([node, cluster_call, retry_interval], 1000),
meck:new(emqx_alarm, [non_strict, passthrough, no_link]), meck:new(emqx_alarm, [non_strict, passthrough, no_link]),
@ -53,6 +54,7 @@ init_per_suite(Config) ->
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([]),
ekka:stop(), ekka:stop(),
mria:stop(), mria:stop(),
meck:unload(mria), meck:unload(mria),
@ -255,13 +257,13 @@ t_fast_forward_commit(_Config) ->
), ),
ok. ok.
t_handler_unexpected_msg(_Config) -> t_cleaner_unexpected_msg(_Config) ->
Handler = emqx_cluster_rpc_handler, Cleaner = emqx_cluster_cleaner,
OldPid = erlang:whereis(Handler), OldPid = erlang:whereis(Cleaner),
ok = gen_server:cast(Handler, unexpected_cast_msg), ok = gen_server:cast(Cleaner, unexpected_cast_msg),
ignore = gen_server:call(Handler, unexpected_cast_msg), ignore = gen_server:call(Cleaner, unexpected_cast_msg),
erlang:send(Handler, unexpected_info_msg), erlang:send(Cleaner, unexpected_info_msg),
NewPid = erlang:whereis(Handler), NewPid = erlang:whereis(Cleaner),
?assertEqual(OldPid, NewPid), ?assertEqual(OldPid, NewPid),
ok. ok.
@ -279,8 +281,8 @@ start() ->
{ok, Pid1} = emqx_cluster_rpc:start_link(), {ok, Pid1} = emqx_cluster_rpc:start_link(),
{ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500), {ok, Pid2} = emqx_cluster_rpc:start_link({node(), ?NODE2}, ?NODE2, 500),
{ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500), {ok, Pid3} = emqx_cluster_rpc:start_link({node(), ?NODE3}, ?NODE3, 500),
{ok, Pid4} = emqx_cluster_rpc_handler:start_link(100, 500), {ok, Pid4} = emqx_cluster_rpc_cleaner:start_link(100, 500),
true = erlang:register(emqx_cluster_rpc_handler, Pid4), true = erlang:register(emqx_cluster_rpc_cleaner, Pid4),
{ok, [Pid1, Pid2, Pid3, Pid4]}. {ok, [Pid1, Pid2, Pid3, Pid4]}.
stop() -> stop() ->
@ -296,7 +298,7 @@ stop() ->
end end
|| N <- [?NODE1, ?NODE2, ?NODE3] || N <- [?NODE1, ?NODE2, ?NODE3]
], ],
gen_server:stop(emqx_cluster_rpc_handler, normal, 5000). gen_server:stop(emqx_cluster_rpc_cleaner, normal, 5000).
receive_msg(0, _Msg) -> receive_msg(0, _Msg) ->
ok; ok;

View File

@ -172,10 +172,15 @@ on_query(
%% not return result, next loop will try again %% not return result, next loop will try again
on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State); on_query(InstId, {TypeOrKey, SQLOrKey, Params, Timeout}, State);
{error, Reason} -> {error, Reason} ->
LogMeta = #{connector => InstId, sql => SQLOrKey, state => State}, ?tp(
?SLOG(
error, error,
LogMeta#{msg => "mysql_connector_do_prepare_failed", reason => Reason} "mysql_connector_do_prepare_failed",
#{
connector => InstId,
sql => SQLOrKey,
state => State,
reason => Reason
}
), ),
{error, Reason} {error, Reason}
end; end;
@ -417,12 +422,10 @@ on_sql_query(
), ),
do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta); do_sql_query(SQLFunc, Conn, SQLOrKey, Params, Timeout, LogMeta);
{error, disconnected} -> {error, disconnected} ->
?SLOG( ?tp(
error, error,
LogMeta#{ "mysql_connector_do_sql_query_failed",
msg => "mysql_connector_do_sql_query_failed", LogMeta#{reason => worker_is_disconnected}
reason => worker_is_disconnected
}
), ),
{error, {recoverable_error, disconnected}} {error, {recoverable_error, disconnected}}
end. end.

View File

@ -44,7 +44,8 @@
execute_batch/3 execute_batch/3
]). ]).
-export([do_get_status/1]). %% for ecpool workers usage
-export([do_get_status/1, prepare_sql_to_conn/2]).
-define(PGSQL_HOST_OPTIONS, #{ -define(PGSQL_HOST_OPTIONS, #{
default_port => ?PGSQL_DEFAULT_PORT default_port => ?PGSQL_DEFAULT_PORT

View File

@ -91,20 +91,20 @@ fields(clientinfo_override) ->
]; ];
fields(udp_listeners) -> fields(udp_listeners) ->
[ [
{udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(udp_listener)})}, {udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
{dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(dtls_listener)})} {dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
]; ];
fields(tcp_listeners) -> fields(tcp_listeners) ->
[ [
{tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(tcp_listener)})}, {tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
{ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(ssl_listener)})} {ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
]; ];
fields(tcp_udp_listeners) -> fields(tcp_udp_listeners) ->
[ [
{tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(tcp_listener)})}, {tcp, sc(map(name, ref(tcp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
{ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(ssl_listener)})}, {ssl, sc(map(name, ref(ssl_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
{udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(udp_listener)})}, {udp, sc(map(name, ref(udp_listener)), #{desc => ?DESC(listener_name_to_settings_map)})},
{dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(dtls_listener)})} {dtls, sc(map(name, ref(dtls_listener)), #{desc => ?DESC(listener_name_to_settings_map)})}
]; ];
fields(tcp_listener) -> fields(tcp_listener) ->
%% some special configs for tcp listener %% some special configs for tcp listener
@ -167,19 +167,19 @@ desc(udp_listeners) ->
desc(tcp_listeners) -> desc(tcp_listeners) ->
"Settings for the TCP listeners."; "Settings for the TCP listeners.";
desc(tcp_udp_listeners) -> desc(tcp_udp_listeners) ->
"Settings for the listeners."; "Settings for TCP and UDP listeners.";
desc(tcp_listener) -> desc(tcp_listener) ->
"Settings for the TCP listener."; "Settings for TCP listener.";
desc(ssl_listener) -> desc(ssl_listener) ->
"Settings for the SSL listener."; "Settings for SSL listener.";
desc(udp_listener) -> desc(udp_listener) ->
"Settings for the UDP listener."; "Settings for UDP listener.";
desc(dtls_listener) -> desc(dtls_listener) ->
"Settings for the DTLS listener."; "Settings for DTLS listener.";
desc(udp_opts) -> desc(udp_opts) ->
"Settings for the UDP sockets."; "Settings for UDP sockets.";
desc(dtls_opts) -> desc(dtls_opts) ->
"Settings for the DTLS protocol."; "Settings for DTLS protocol.";
desc(_) -> desc(_) ->
undefined. undefined.
@ -189,6 +189,8 @@ authentication_schema() ->
#{ #{
required => {false, recursively}, required => {false, recursively},
desc => ?DESC(gateway_common_authentication), desc => ?DESC(gateway_common_authentication),
%% we do not expose this to the user for now
importance => ?IMPORTANCE_HIDDEN,
examples => emqx_authn_api:authenticator_examples() examples => emqx_authn_api:authenticator_examples()
} }
). ).
@ -234,7 +236,7 @@ mountpoint(Default) ->
binary(), binary(),
#{ #{
default => iolist_to_binary(Default), default => iolist_to_binary(Default),
desc => ?DESC(gateway_common_mountpoint) desc => ?DESC(gateway_mountpoint)
} }
). ).
@ -283,7 +285,7 @@ common_listener_opts() ->
binary(), binary(),
#{ #{
default => undefined, default => undefined,
desc => ?DESC(gateway_common_listener_mountpoint) desc => ?DESC(gateway_mountpoint)
} }
)}, )},
{access_rules, {access_rules,

View File

@ -1,12 +1,42 @@
# emqx-management # EMQX Management
EMQX Management API EMQX Management offers various interfaces for administrators to interact with
the system, either by a remote console attached to a running node, a CLI (i.e.
`./emqx ctl`), or through its rich CRUD-style REST API (mostly used by EMQX'
dashboard). The system enables administrators to modify both cluster and
individual node configurations, and provides the ability to view and reset
different statistics and metrics.
## How to Design RESTful API? ## Functionality
http://restful-api-design.readthedocs.io/en/latest/scope.html Amongst others it allows to manage
default application see: * Alarms
header: * API Keys
authorization: Basic YWRtaW46cHVibGlj * Banned clients, users or hosts
* Clients (and sessions) including their topic subscriptions
* Configurations
* Manage plugins
* Fixed subscriptions
* Topics
Moreover it lets you
* modify hot and non-hot updatable configuration values,
* publish messages, as well as bulk messages,
* create trace files,
* and last but not least monitor system status.
## Implementation Notes
API endpoints are implemented using the `minirest` framework in combination with
HOCON schema and OpenAPI 3.0 specifications.
## TODO/FIXME
At its current state there are some reverse dependencies from other applications
that do calls directly into `emqx_mgmt`.
Also, and somewhat related, its bpapi proto modules do calls directly into
other applications.

View File

@ -52,7 +52,7 @@
-define(INVALID_NODE, 'INVALID_NODE'). -define(INVALID_NODE, 'INVALID_NODE').
api_spec() -> api_spec() ->
emqx_dashboard_swagger:spec(?MODULE). emqx_dashboard_swagger:spec(?MODULE, #{check_schema => true}).
paths() -> paths() ->
[ [
@ -202,9 +202,9 @@ delayed_message(get, #{bindings := #{node := NodeBin, msgid := HexId}}) ->
{200, Message#{payload => base64:encode(Payload)}} {200, Message#{payload => base64:encode(Payload)}}
end; end;
{error, not_found} -> {error, not_found} ->
{404, generate_http_code_map(not_found, Id)}; {404, generate_http_code_map(not_found, HexId)};
{badrpc, _} -> {badrpc, _} ->
{400, generate_http_code_map(invalid_node, Id)} {400, generate_http_code_map(invalid_node, NodeBin)}
end end
end end
); );
@ -271,19 +271,19 @@ generate_http_code_map(id_schema_error, Id) ->
#{ #{
code => ?MESSAGE_ID_SCHEMA_ERROR, code => ?MESSAGE_ID_SCHEMA_ERROR,
message => message =>
iolist_to_binary(io_lib:format("Message ID ~p schema error", [Id])) iolist_to_binary(io_lib:format("Message ID ~s schema error", [Id]))
}; };
generate_http_code_map(not_found, Id) -> generate_http_code_map(not_found, Id) ->
#{ #{
code => ?MESSAGE_ID_NOT_FOUND, code => ?MESSAGE_ID_NOT_FOUND,
message => message =>
iolist_to_binary(io_lib:format("Message ID ~p not found", [Id])) iolist_to_binary(io_lib:format("Message ID ~s not found", [Id]))
}; };
generate_http_code_map(invalid_node, Node) -> generate_http_code_map(invalid_node, Node) ->
#{ #{
code => ?INVALID_NODE, code => ?INVALID_NODE,
message => message =>
iolist_to_binary(io_lib:format("The node name ~p is invalid", [Node])) iolist_to_binary(io_lib:format("The node name ~s is invalid", [Node]))
}. }.
make_maybe(X, Error, Fun) -> make_maybe(X, Error, Fun) ->

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*- %% -*- mode: erlang -*-
{application, emqx_modules, [ {application, emqx_modules, [
{description, "EMQX Modules"}, {description, "EMQX Modules"},
{vsn, "5.0.11"}, {vsn, "5.0.12"},
{modules, []}, {modules, []},
{applications, [kernel, stdlib, emqx, emqx_ctl]}, {applications, [kernel, stdlib, emqx, emqx_ctl]},
{mod, {emqx_modules_app, []}}, {mod, {emqx_modules_app, []}},

View File

@ -356,7 +356,14 @@ is_buffer_supported(Module) ->
-spec call_start(manager_id(), module(), resource_config()) -> -spec call_start(manager_id(), module(), resource_config()) ->
{ok, resource_state()} | {error, Reason :: term()}. {ok, resource_state()} | {error, Reason :: term()}.
call_start(MgrId, Mod, Config) -> call_start(MgrId, Mod, Config) ->
?SAFE_CALL(Mod:on_start(MgrId, Config)). try
Mod:on_start(MgrId, Config)
catch
throw:Error ->
{error, Error};
Kind:Error:Stacktrace ->
{error, #{exception => Kind, reason => Error, stacktrace => Stacktrace}}
end.
-spec call_health_check(manager_id(), module(), resource_state()) -> -spec call_health_check(manager_id(), module(), resource_state()) ->
resource_status() resource_status()

View File

@ -30,18 +30,6 @@ namespace() -> "resource_schema".
roots() -> []. roots() -> [].
fields("resource_opts_sync_only") ->
[
{resource_opts,
mk(
ref(?MODULE, "creation_opts_sync_only"),
resource_opts_meta()
)}
];
fields("creation_opts_sync_only") ->
Fields = fields("creation_opts"),
QueryMod = {query_mode, fun query_mode_sync_only/1},
lists:keyreplace(query_mode, 1, Fields, QueryMod);
fields("resource_opts") -> fields("resource_opts") ->
[ [
{resource_opts, {resource_opts,
@ -82,7 +70,7 @@ worker_pool_size(required) -> false;
worker_pool_size(_) -> undefined. worker_pool_size(_) -> undefined.
resume_interval(type) -> emqx_schema:duration_ms(); resume_interval(type) -> emqx_schema:duration_ms();
resume_interval(importance) -> hidden; resume_interval(importance) -> ?IMPORTANCE_HIDDEN;
resume_interval(desc) -> ?DESC("resume_interval"); resume_interval(desc) -> ?DESC("resume_interval");
resume_interval(required) -> false; resume_interval(required) -> false;
resume_interval(_) -> undefined. resume_interval(_) -> undefined.
@ -117,12 +105,6 @@ query_mode(default) -> async;
query_mode(required) -> false; query_mode(required) -> false;
query_mode(_) -> undefined. query_mode(_) -> undefined.
query_mode_sync_only(type) -> enum([sync]);
query_mode_sync_only(desc) -> ?DESC("query_mode_sync_only");
query_mode_sync_only(default) -> sync;
query_mode_sync_only(required) -> false;
query_mode_sync_only(_) -> undefined.
request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]); request_timeout(type) -> hoconsc:union([infinity, emqx_schema:duration_ms()]);
request_timeout(desc) -> ?DESC("request_timeout"); request_timeout(desc) -> ?DESC("request_timeout");
request_timeout(default) -> <<"15s">>; request_timeout(default) -> <<"15s">>;
@ -167,7 +149,4 @@ max_queue_bytes(default) -> ?DEFAULT_QUEUE_SIZE_RAW;
max_queue_bytes(required) -> false; max_queue_bytes(required) -> false;
max_queue_bytes(_) -> undefined. max_queue_bytes(_) -> undefined.
desc("creation_opts") -> desc("creation_opts") -> ?DESC("creation_opts").
?DESC("creation_opts");
desc("creation_opts_sync_only") ->
?DESC("creation_opts").

View File

@ -508,12 +508,12 @@ nested_put(Alias, Val, Columns0) ->
emqx_rule_maps:nested_put(Alias, Val, Columns). emqx_rule_maps:nested_put(Alias, Val, Columns).
-define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found). -define(IS_RES_DOWN(R), R == stopped; R == not_connected; R == not_found).
inc_action_metrics(ok, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.success');
inc_action_metrics({error, {recoverable_error, _}}, RuleId) -> inc_action_metrics({error, {recoverable_error, _}}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) -> inc_action_metrics(?RESOURCE_ERROR_M(R, _), RuleId) when ?IS_RES_DOWN(R) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service'); emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed.out_of_service');
inc_action_metrics({error, {unrecoverable_error, _}}, RuleId) ->
emqx_metrics_worker:inc(rule_metrics, RuleId, 'actions.failed');
inc_action_metrics(R, RuleId) -> inc_action_metrics(R, RuleId) ->
case is_ok_result(R) of case is_ok_result(R) of
false -> false ->
@ -527,5 +527,5 @@ is_ok_result(ok) ->
true; true;
is_ok_result(R) when is_tuple(R) -> is_ok_result(R) when is_tuple(R) ->
ok == erlang:element(1, R); ok == erlang:element(1, R);
is_ok_result(ok) -> is_ok_result(_) ->
false. false.

View File

@ -687,10 +687,6 @@ t_jq(_) ->
got_timeout got_timeout
end, end,
ConfigRootKey = emqx_rule_engine_schema:namespace(), ConfigRootKey = emqx_rule_engine_schema:namespace(),
DefaultTimeOut = emqx_config:get([
ConfigRootKey,
jq_function_default_timeout
]),
?assertThrow( ?assertThrow(
{jq_exception, {timeout, _}}, {jq_exception, {timeout, _}},
apply_func(jq, [TOProgram, <<"-2">>]) apply_func(jq, [TOProgram, <<"-2">>])

View File

@ -0,0 +1,3 @@
Add support for `async` query mode for most bridges.
Before this change, some bridges (Cassandra, MongoDB, MySQL, Postgres, Redis, RocketMQ, TDengine) were only allowed to be created with a `sync` query mode.

View File

@ -0,0 +1,2 @@
Ensure that when the core or replicant node starting, the `cluster-override.conf` file is only copied from the core node.
Previously, when sorting nodes by startup time, the core node may have copied this file from the replicant node.

View File

@ -0,0 +1,2 @@
确保当 core 或 replicant 节点启动时,仅从 core 节点复制 `cluster-override.conf` 文件。
此前按照节点启动时间排序时core 节点可能从 replicant 节点复制该文件。

View File

@ -0,0 +1 @@
Fix crash checking `limit` and `page` parameters in `/mqtt/delayed/messages` API call.

View File

@ -0,0 +1 @@
Do not expose listener level authentications before extensive verification.

View File

@ -0,0 +1 @@
在大量验证完成前不暴露监听器级的认证功能。

View File

@ -0,0 +1,2 @@
For security reasons, the value of the `password` field in the API examples is replaced with `******`.

View File

@ -0,0 +1,2 @@
出于安全原因,将 API 示例中 `password` 字段的值,统一更换为 `******`

View File

@ -0,0 +1,4 @@
Don't increment 'actions.failed.unknown' rule metrics counter upon receiving unrecoverable bridge errors.
This counter is displayed on the dashboard's rule overview tab ('Action statistics' - 'Unknown').
The fix is only applicable for synchronous bridges, as all rule actions for asynchronous bridges
are counted as successful (they increment 'actions.success' which is displayed as 'Action statistics' - 'Success').

View File

@ -0,0 +1 @@
In TDengine, removed the redundant database name from the SQL template.

View File

@ -0,0 +1 @@
在 TDengine 桥接的 SQL 模板中,删除了多余的数据库表名。

View File

@ -1 +0,0 @@
Clickhouse has got a fix that makes the error message better when users click the test button in the settings dialog.

View File

@ -1 +0,0 @@
Clickhouse 已经修复了一个问题,当用户在设置对话框中点击测试按钮时,错误信息会更清晰。

View File

@ -1,109 +0,0 @@
emqx_ee_bridge_clickhouse {
local_topic {
desc {
en: """The MQTT topic filter to be forwarded to Clickhouse. All MQTT 'PUBLISH' messages with the topic
matching the local_topic will be forwarded.</br>
NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is
configured, then both the data got from the rule and the MQTT messages that match local_topic
will be forwarded.
"""
zh: """发送到 'local_topic' 的消息都会转发到 Clickhouse。 </br>
注意:如果这个 Bridge 被用作规则EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发。
"""
}
label {
en: "Local Topic"
zh: "本地 Topic"
}
}
sql_template {
desc {
en: """SQL Template. The template string can contain placeholders
for message metadata and payload field. The placeholders are inserted
without any checking and special formatting, so it is important to
ensure that the inserted values are formatted and escaped correctly."""
zh:
"""SQL模板。模板字符串可以包含消息元数据和有效载荷字段的占位符。占位符
的插入不需要任何检查和特殊格式化,因此必须确保插入的数值格式化和转义正确。模板字符串可以包含占位符
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
所以必须确保插入的值的格式正确。因此,确保插入的值格式化和转义正确是非常重要的。模板字符串可以包含占位符
模板字符串可以包含消息元数据和有效载荷字段的占位符。这些占位符被插入
所以必须确保插入的值的格式正确。确保插入的值被正确地格式化和转义。"""
}
label {
en: "SQL Template"
zh: "SQL 模板"
}
}
batch_value_separator {
desc {
en: """The bridge repeats what comes after the VALUES or FORMAT FormatType in the
SQL template to form a batch request. The value specified with
this parameter will be inserted between the values. The default
value ',' works for the VALUES format, but other values
might be needed if you specify some other format with the
clickhouse FORMAT syntax.
See https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ and
https://clickhouse.com/docs/en/interfaces/formats#formats for more information about
the format syntax and the available formats."""
zh: """桥接会重复 VALUES 或 FORMAT 格式类型之后的内容。中 VALUES 或
FORMAT FormatType 后面的内容,以形成一个批处理请求。用这个参数指定的值
这个参数指定的值将被插入到这些值之间。默认的
默认值','适用于VALUES格式但是如果你指定了其他的格式可能需要其他的值。可能需要其他值如果你用
"clickhouse FORMAT "语法指定其他格式。语法指定其他格式。
参见https://clickhouse.com/docs/en/sql-reference/statements/insert-into/ 和
https://clickhouse.com/docs/en/interfaces/formats#formats 了解更多关于
格式语法和可用的格式。"""
}
label {
en: "Batch Value Separator"
zh: "批量值分离器"
}
}
config_enable {
desc {
en: """Enable or disable this bridge"""
zh: """启用/禁用桥接"""
}
label {
en: "Enable Or Disable Bridge"
zh: "启用/禁用桥接"
}
}
desc_config {
desc {
en: """Configuration for a Clickhouse bridge."""
zh: """Clickhouse 桥接配置"""
}
label: {
en: "Clickhouse Bridge Configuration"
zh: "Clickhouse 桥接配置"
}
}
desc_type {
desc {
en: """The Bridge Type"""
zh: """Bridge 类型"""
}
label {
en: "Bridge Type"
zh: "桥接类型"
}
}
desc_name {
desc {
en: """Bridge name."""
zh: """桥接名字"""
}
label {
en: "Bridge Name"
zh: "桥接名字"
}
}
}

View File

@ -86,21 +86,10 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)} )}
] ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_ee_connector_cassa:fields(config) -- (emqx_ee_connector_cassa:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()); emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") -> fields("post") ->
fields("post", cassandra); fields("post", cassandra);
fields("put") -> fields("put") ->
@ -115,8 +104,6 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for Cassandra using `", string:to_upper(Method), "` method."]; ["Configuration for Cassandra using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -50,7 +50,7 @@ values(_Method, Type) ->
database => <<"mqtt">>, database => <<"mqtt">>,
pool_size => 8, pool_size => 8,
username => <<"default">>, username => <<"default">>,
password => <<"public">>, password => <<"******">>,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR, batch_value_separator => ?DEFAULT_BATCH_VALUE_SEPARATOR,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,

View File

@ -46,7 +46,7 @@ values(_Method) ->
database => <<"mqtt">>, database => <<"mqtt">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"******">>,
template => ?DEFAULT_TEMPLATE, template => ?DEFAULT_TEMPLATE,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{

View File

@ -61,7 +61,7 @@ values("influxdb_api_v1", post) ->
TypeOpts = #{ TypeOpts = #{
database => <<"example_database">>, database => <<"example_database">>,
username => <<"example_username">>, username => <<"example_username">>,
password => <<"examlpe_password">>, password => <<"******">>,
server => <<"127.0.0.1:8086">> server => <<"127.0.0.1:8086">>
}, },
values(common, "influxdb_api_v1", SupportUint, TypeOpts); values(common, "influxdb_api_v1", SupportUint, TypeOpts);

View File

@ -64,7 +64,7 @@ values(common_config) ->
authentication => #{ authentication => #{
mechanism => <<"plain">>, mechanism => <<"plain">>,
username => <<"username">>, username => <<"username">>,
password => <<"password">> password => <<"******">>
}, },
bootstrap_hosts => <<"localhost:9092">>, bootstrap_hosts => <<"localhost:9092">>,
connect_timeout => <<"5s">>, connect_timeout => <<"5s">>,
@ -233,7 +233,7 @@ fields(socket_opts) ->
boolean(), boolean(),
#{ #{
default => true, default => true,
hidden => true, importance => ?IMPORTANCE_HIDDEN,
desc => ?DESC(socket_nodelay) desc => ?DESC(socket_nodelay)
} }
)} )}

View File

@ -38,7 +38,7 @@ fields("config") ->
{enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})}, {enable, mk(boolean(), #{desc => ?DESC("enable"), default => true})},
{collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})}, {collection, mk(binary(), #{desc => ?DESC("collection"), default => <<"mqtt">>})},
{payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})} {payload_template, mk(binary(), #{required => false, desc => ?DESC("payload_template")})}
] ++ emqx_resource_schema:fields("resource_opts_sync_only"); ] ++ emqx_resource_schema:fields("resource_opts");
fields(mongodb_rs) -> fields(mongodb_rs) ->
emqx_connector_mongo:fields(rs) ++ fields("config"); emqx_connector_mongo:fields(rs) ++ fields("config");
fields(mongodb_sharded) -> fields(mongodb_sharded) ->
@ -149,7 +149,7 @@ values(common, MongoType, Method, TypeOpts) ->
srv_record => false, srv_record => false,
pool_size => 8, pool_size => 8,
username => <<"myuser">>, username => <<"myuser">>,
password => <<"mypass">> password => <<"******">>
}, },
MethodVals = method_values(MongoType, Method), MethodVals = method_values(MongoType, Method),
Vals0 = maps:merge(MethodVals, Common), Vals0 = maps:merge(MethodVals, Common),

View File

@ -47,7 +47,7 @@ values(_Method) ->
database => <<"test">>, database => <<"test">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"">>, password => <<"******">>,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{
@ -79,21 +79,10 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)} )}
] ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_connector_mysql:fields(config) -- (emqx_connector_mysql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()); emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
@ -105,8 +94,6 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for MySQL using `", string:to_upper(Method), "` method."]; ["Configuration for MySQL using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -49,7 +49,7 @@ values(_Method, Type) ->
database => <<"mqtt">>, database => <<"mqtt">>,
pool_size => 8, pool_size => 8,
username => <<"root">>, username => <<"root">>,
password => <<"public">>, password => <<"******">>,
sql => ?DEFAULT_SQL, sql => ?DEFAULT_SQL,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
resource_opts => #{ resource_opts => #{
@ -81,21 +81,10 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)} )}
] ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_connector_pgsql:fields(config) -- (emqx_connector_pgsql:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()); emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") -> fields("post") ->
fields("post", pgsql); fields("post", pgsql);
fields("put") -> fields("put") ->
@ -110,8 +99,6 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."]; ["Configuration for PostgreSQL using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -77,7 +77,7 @@ values(common, RedisType, SpecificOpts) ->
enable => true, enable => true,
local_topic => <<"local/topic/#">>, local_topic => <<"local/topic/#">>,
pool_size => 8, pool_size => 8,
password => <<"secret">>, password => <<"******">>,
command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>],
resource_opts => values(resource_opts, RedisType, #{}), resource_opts => values(resource_opts, RedisType, #{}),
ssl => #{enable => false} ssl => #{enable => false}
@ -180,10 +180,10 @@ resource_fields(Type) ->
resource_creation_fields("redis_cluster") -> resource_creation_fields("redis_cluster") ->
% TODO % TODO
% Cluster bridge is currently incompatible with batching. % Cluster bridge is currently incompatible with batching.
Fields = emqx_resource_schema:fields("creation_opts_sync_only"), Fields = emqx_resource_schema:fields("creation_opts"),
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]); lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time, enable_batch]);
resource_creation_fields(_) -> resource_creation_fields(_) ->
emqx_resource_schema:fields("creation_opts_sync_only"). emqx_resource_schema:fields("creation_opts").
desc("config") -> desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");

View File

@ -80,21 +80,10 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), required => false} #{desc => ?DESC("local_topic"), required => false}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{<<"request_timeout">> => ?DEFFAULT_REQ_TIMEOUT},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)} )}
] ++ ] ++ emqx_resource_schema:fields("resource_opts") ++
(emqx_ee_connector_rocketmq:fields(config) -- (emqx_ee_connector_rocketmq:fields(config) --
emqx_connector_schema_lib:prepare_statement_fields()); emqx_connector_schema_lib:prepare_statement_fields());
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
@ -106,8 +95,6 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for RocketMQ using `", string:to_upper(Method), "` method."]; ["Configuration for RocketMQ using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -22,7 +22,7 @@
]). ]).
-define(DEFAULT_SQL, << -define(DEFAULT_SQL, <<
"insert into mqtt.t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) " "insert into t_mqtt_msg(ts, msgid, mqtt_topic, qos, payload, arrived) "
"values (${ts}, ${id}, ${topic}, ${qos}, ${payload}, ${timestamp})" "values (${ts}, ${id}, ${topic}, ${qos}, ${payload}, ${timestamp})"
>>). >>).
@ -80,19 +80,8 @@ fields("config") ->
mk( mk(
binary(), binary(),
#{desc => ?DESC("local_topic"), default => undefined} #{desc => ?DESC("local_topic"), default => undefined}
)},
{resource_opts,
mk(
ref(?MODULE, "creation_opts"),
#{
required => false,
default => #{},
desc => ?DESC(emqx_resource_schema, <<"resource_opts">>)
}
)} )}
] ++ emqx_ee_connector_tdengine:fields(config); ] ++ emqx_resource_schema:fields("resource_opts") ++ emqx_ee_connector_tdengine:fields(config);
fields("creation_opts") ->
emqx_resource_schema:fields("creation_opts_sync_only");
fields("post") -> fields("post") ->
[type_field(), name_field() | fields("config")]; [type_field(), name_field() | fields("config")];
fields("put") -> fields("put") ->
@ -104,8 +93,6 @@ desc("config") ->
?DESC("desc_config"); ?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for TDengine using `", string:to_upper(Method), "` method."]; ["Configuration for TDengine using `", string:to_upper(Method), "` method."];
desc("creation_opts" = Name) ->
emqx_resource_schema:desc(Name);
desc(_) -> desc(_) ->
undefined. undefined.

View File

@ -95,6 +95,11 @@
commit_fun => brod_group_subscriber_v2:commit_fun() commit_fun => brod_group_subscriber_v2:commit_fun()
}. }.
-define(CLIENT_DOWN_MESSAGE,
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters."
).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `emqx_resource' API %% `emqx_resource' API
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
@ -152,7 +157,7 @@ on_start(InstanceId, Config) ->
kafka_hosts => BootstrapHosts, kafka_hosts => BootstrapHosts,
reason => emqx_misc:redact(Reason) reason => emqx_misc:redact(Reason)
}), }),
throw(failed_to_start_kafka_client) throw(?CLIENT_DOWN_MESSAGE)
end, end,
start_consumer(Config, InstanceId, ClientID). start_consumer(Config, InstanceId, ClientID).
@ -173,7 +178,7 @@ on_get_status(_InstanceID, State) ->
kafka_client_id := ClientID, kafka_client_id := ClientID,
kafka_topics := KafkaTopics kafka_topics := KafkaTopics
} = State, } = State,
do_get_status(ClientID, KafkaTopics, SubscriberId). do_get_status(State, ClientID, KafkaTopics, SubscriberId).
%%------------------------------------------------------------------------------------- %%-------------------------------------------------------------------------------------
%% `brod_group_subscriber' API %% `brod_group_subscriber' API
@ -370,22 +375,41 @@ stop_client(ClientID) ->
), ),
ok. ok.
do_get_status(ClientID, [KafkaTopic | RestTopics], SubscriberId) -> do_get_status(State, ClientID, [KafkaTopic | RestTopics], SubscriberId) ->
case brod:get_partitions_count(ClientID, KafkaTopic) of case brod:get_partitions_count(ClientID, KafkaTopic) of
{ok, NPartitions} -> {ok, NPartitions} ->
case do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) of case do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) of
connected -> do_get_status(ClientID, RestTopics, SubscriberId); connected -> do_get_status(State, ClientID, RestTopics, SubscriberId);
disconnected -> disconnected disconnected -> disconnected
end; end;
{error, {client_down, Context}} ->
case infer_client_error(Context) of
auth_error ->
Message = "Authentication error. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
{auth_error, Message0} ->
Message = binary_to_list(Message0) ++ "; " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
connection_refused ->
Message = "Connection refused. " ++ ?CLIENT_DOWN_MESSAGE,
{disconnected, State, Message};
_ ->
{disconnected, State, ?CLIENT_DOWN_MESSAGE}
end;
{error, leader_not_available} ->
Message =
"Leader connection not available. Please check the Kafka topic used,"
" the connection parameters and Kafka cluster health",
{disconnected, State, Message};
_ -> _ ->
disconnected disconnected
end; end;
do_get_status(_ClientID, _KafkaTopics = [], _SubscriberId) -> do_get_status(_State, _ClientID, _KafkaTopics = [], _SubscriberId) ->
connected. connected.
-spec do_get_status(brod:client_id(), binary(), subscriber_id(), pos_integer()) -> -spec do_get_status1(brod:client_id(), binary(), subscriber_id(), pos_integer()) ->
connected | disconnected. connected | disconnected.
do_get_status(ClientID, KafkaTopic, SubscriberId, NPartitions) -> do_get_status1(ClientID, KafkaTopic, SubscriberId, NPartitions) ->
Results = Results =
lists:map( lists:map(
fun(N) -> fun(N) ->
@ -504,3 +528,15 @@ encode(Value, base64) ->
to_bin(B) when is_binary(B) -> B; to_bin(B) when is_binary(B) -> B;
to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8).
infer_client_error(Error) ->
case Error of
[{_BrokerEndpoint, {econnrefused, _}} | _] ->
connection_refused;
[{_BrokerEndpoint, {{sasl_auth_error, Message}, _}} | _] when is_binary(Message) ->
{auth_error, Message};
[{_BrokerEndpoint, {{sasl_auth_error, _}, _}} | _] ->
auth_error;
_ ->
undefined
end.

View File

@ -114,7 +114,10 @@ on_start(InstId, Config) ->
client_id => ClientId client_id => ClientId
} }
), ),
throw(failed_to_start_kafka_producer) throw(
"Failed to start Kafka client. Please check the logs for errors and check"
" the connection parameters."
)
end. end.
on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) -> on_stop(_InstanceID, #{client_id := ClientID, producers := Producers, resource_id := ResourceID}) ->

View File

@ -388,7 +388,9 @@ end_per_testcase(_Testcase, Config) ->
maps:values(ProducersMapping) maps:values(ProducersMapping)
), ),
ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId), ok = wolff:stop_and_delete_supervised_client(KafkaProducerClientId),
emqx_common_test_helpers:call_janitor(), %% in CI, apparently this needs more time since the
%% machines struggle with all the containers running...
emqx_common_test_helpers:call_janitor(60_000),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
ok ok
end. end.
@ -1664,7 +1666,7 @@ t_cluster_group(Config) ->
|| {Name, Opts} <- Cluster || {Name, Opts} <- Cluster
], ],
on_exit(fun() -> on_exit(fun() ->
lists:foreach( emqx_misc:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_slave(N)
@ -1875,7 +1877,7 @@ t_cluster_node_down(Config) ->
Cluster Cluster
), ),
on_exit(fun() -> on_exit(fun() ->
lists:foreach( emqx_misc:pmap(
fun(N) -> fun(N) ->
ct:pal("stopping ~p", [N]), ct:pal("stopping ~p", [N]),
ok = emqx_common_test_helpers:stop_slave(N) ok = emqx_common_test_helpers:stop_slave(N)
@ -1894,10 +1896,14 @@ t_cluster_node_down(Config) ->
{ok, _} = snabbkaffe:receive_events(SRef0), {ok, _} = snabbkaffe:receive_events(SRef0),
lists:foreach( lists:foreach(
fun(N) -> fun(N) ->
?assertMatch( ?retry(
{ok, _}, _Sleep1 = 100,
erpc:call(N, emqx_bridge, lookup, [BridgeId]), _Attempts1 = 50,
#{node => N} ?assertMatch(
{ok, _},
erpc:call(N, emqx_bridge, lookup, [BridgeId]),
#{node => N}
)
) )
end, end,
Nodes Nodes

View File

@ -9,6 +9,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("brod/include/brod.hrl"). -include_lib("brod/include/brod.hrl").
-define(PRODUCER, emqx_bridge_impl_kafka_producer). -define(PRODUCER, emqx_bridge_impl_kafka_producer).
@ -415,9 +416,11 @@ t_failed_creation_then_fix(Config) ->
Type, erlang:list_to_atom(Name), WrongConf Type, erlang:list_to_atom(Name), WrongConf
), ),
WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name}, WrongConfigAtom = WrongConfigAtom1#{bridge_name => Name},
?assertThrow(failed_to_start_kafka_producer, ?PRODUCER:on_start(ResourceId, WrongConfigAtom)), ?assertThrow(Reason when is_list(Reason), ?PRODUCER:on_start(ResourceId, WrongConfigAtom)),
%% before throwing, it should cleanup the client process. %% before throwing, it should cleanup the client process. we
?assertEqual([], supervisor:which_children(wolff_client_sup)), %% retry because the supervisor might need some time to really
%% remove it from its tree.
?retry(50, 10, ?assertEqual([], supervisor:which_children(wolff_client_sup))),
%% must succeed with correct config %% must succeed with correct config
{ok, #{config := ValidConfigAtom1}} = emqx_bridge:create( {ok, #{config := ValidConfigAtom1}} = emqx_bridge:create(
Type, erlang:list_to_atom(Name), ValidConf Type, erlang:list_to_atom(Name), ValidConf

View File

@ -73,15 +73,16 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout], NonBatchCases = [t_write_timeout],
QueryModeGroups = [{group, async}, {group, sync}],
BatchingGroups = [
%{group, with_batch},
{group, without_batch}
],
[ [
{tcp, [ {tcp, QueryModeGroups},
%{group, with_batch}, {tls, QueryModeGroups},
{group, without_batch} {async, BatchingGroups},
]}, {sync, BatchingGroups},
{tls, [
%{group, with_batch},
{group, without_batch}
]},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs} {without_batch, TCs}
]. ].
@ -93,7 +94,6 @@ init_per_group(tcp, Config) ->
{cassa_host, Host}, {cassa_host, Host},
{cassa_port, Port}, {cassa_port, Port},
{enable_tls, false}, {enable_tls, false},
{query_mode, sync},
{proxy_name, "cassa_tcp"} {proxy_name, "cassa_tcp"}
| Config | Config
]; ];
@ -104,10 +104,13 @@ init_per_group(tls, Config) ->
{cassa_host, Host}, {cassa_host, Host},
{cassa_port, Port}, {cassa_port, Port},
{enable_tls, true}, {enable_tls, true},
{query_mode, sync},
{proxy_name, "cassa_tls"} {proxy_name, "cassa_tls"}
| Config | Config
]; ];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) -> init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0], Config = [{enable_batch, true} | Config0],
common_init(Config); common_init(Config);
@ -139,14 +142,15 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
connect_and_clear_table(Config),
ok = snabbkaffe:stop(), ok = snabbkaffe:stop(),
connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
ok. ok.
@ -171,6 +175,7 @@ common_init(Config0) ->
ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]), ok = emqx_common_test_helpers:start_apps([emqx_conf, emqx_bridge]),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
% Connect to cassnadra directly and create the table % Connect to cassnadra directly and create the table
catch connect_and_drop_table(Config0),
connect_and_create_table(Config0), connect_and_create_table(Config0),
{Name, CassaConf} = cassa_config(BridgeType, Config0), {Name, CassaConf} = cassa_config(BridgeType, Config0),
Config = Config =
@ -250,9 +255,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config. Config.
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(cassa_bridge_type, Config), BridgeType = ?config(cassa_bridge_type, Config),
Name = ?config(cassa_name, Config), Name = ?config(cassa_name, Config),
BridgeConfig = ?config(cassa_config, Config), BridgeConfig0 = ?config(cassa_config, Config),
BridgeConfig = emqx_map_lib:deep_merge(BridgeConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, BridgeConfig). emqx_bridge:create(BridgeType, Name, BridgeConfig).
delete_bridge(Config) -> delete_bridge(Config) ->
@ -288,6 +297,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(cassa_name, Config),
BridgeType = ?config(cassa_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) when is_reference(Ref) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_cassa(Config) -> connect_direct_cassa(Config) ->
Opts = #{ Opts = #{
nodes => [{?config(cassa_host, Config), ?config(cassa_port, Config)}], nodes => [{?config(cassa_host, Config), ?config(cassa_port, Config)}],
@ -546,15 +576,27 @@ t_write_failure(Config) ->
% ok. % ok.
t_simple_sql_query(Config) -> t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
QueryMode = ?config(query_mode, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {query, <<"SELECT count(1) AS T FROM system.local">>}, Request = {query, <<"SELECT count(1) AS T FROM system.local">>},
Result = query_resource(Config, Request), Result =
case ?config(enable_batch, Config) of case QueryMode of
true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); sync ->
false -> ?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result) query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
?assertMatch({ok, {<<"system.local">>, _, [[1]]}}, Result)
end, end,
ok. ok.
@ -565,22 +607,56 @@ t_missing_data(Config) ->
), ),
%% emqx_ee_connector_cassa will send missed data as a `null` atom %% emqx_ee_connector_cassa will send missed data as a `null` atom
%% to ecql driver %% to ecql driver
Result = send_message(Config, #{}), {_, {ok, Event}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
%% TODO: match error msgs %% TODO: match error msgs
{error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}}, #{
Result result :=
{error, {unrecoverable_error, {8704, <<"Expected 8 or 0 byte long for date (4)">>}}}
},
Event
), ),
ok. ok.
t_bad_sql_parameter(Config) -> t_bad_sql_parameter(Config) ->
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
Name = ?config(cassa_name, Config),
ResourceId = emqx_bridge_resource:resource_id(cassandra, Name),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
)
), ),
Request = {query, <<"">>, [bad_parameter]}, Request = {query, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request), Result =
case ?config(enable_batch, Config) of case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
case receive_result(Ref, 5_000) of
{ok, Res} ->
Res;
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")
end
end,
case EnableBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> false ->

View File

@ -9,6 +9,7 @@
-include_lib("eunit/include/eunit.hrl"). -include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl"). -include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
%% CT boilerplate %% CT boilerplate
@ -16,9 +17,8 @@
all() -> all() ->
[ [
{group, rs}, {group, async},
{group, sharded}, {group, sync}
{group, single}
| (emqx_common_test_helpers:all(?MODULE) -- group_tests()) | (emqx_common_test_helpers:all(?MODULE) -- group_tests())
]. ].
@ -31,12 +31,23 @@ group_tests() ->
]. ].
groups() -> groups() ->
TypeGroups = [
{group, rs},
{group, sharded},
{group, single}
],
[ [
{async, TypeGroups},
{sync, TypeGroups},
{rs, group_tests()}, {rs, group_tests()},
{sharded, group_tests()}, {sharded, group_tests()},
{single, group_tests()} {single, group_tests()}
]. ].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(Type = rs, Config) -> init_per_group(Type = rs, Config) ->
MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"), MongoHost = os:getenv("MONGO_RS_HOST", "mongo1"),
MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")), MongoPort = list_to_integer(os:getenv("MONGO_RS_PORT", "27017")),
@ -44,7 +55,7 @@ init_per_group(Type = rs, Config) ->
true -> true ->
ok = start_apps(), ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[ [
{mongo_host, MongoHost}, {mongo_host, MongoHost},
{mongo_port, MongoPort}, {mongo_port, MongoPort},
@ -63,7 +74,7 @@ init_per_group(Type = sharded, Config) ->
true -> true ->
ok = start_apps(), ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[ [
{mongo_host, MongoHost}, {mongo_host, MongoHost},
{mongo_port, MongoPort}, {mongo_port, MongoPort},
@ -82,7 +93,7 @@ init_per_group(Type = single, Config) ->
true -> true ->
ok = start_apps(), ok = start_apps(),
emqx_mgmt_api_test_util:init_suite(), emqx_mgmt_api_test_util:init_suite(),
{Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type), {Name, MongoConfig} = mongo_config(MongoHost, MongoPort, Type, Config),
[ [
{mongo_host, MongoHost}, {mongo_host, MongoHost},
{mongo_port, MongoPort}, {mongo_port, MongoPort},
@ -99,6 +110,7 @@ end_per_group(_Type, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -109,11 +121,13 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
catch clear_db(Config), catch clear_db(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
catch clear_db(Config), catch clear_db(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:stop(),
ok. ok.
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
@ -140,7 +154,8 @@ mongo_type_bin(sharded) ->
mongo_type_bin(single) -> mongo_type_bin(single) ->
<<"mongodb_single">>. <<"mongodb_single">>.
mongo_config(MongoHost, MongoPort0, rs = Type) -> mongo_config(MongoHost, MongoPort0, rs = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0), MongoPort = integer_to_list(MongoPort0),
Servers = MongoHost ++ ":" ++ MongoPort, Servers = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
@ -154,13 +169,19 @@ mongo_config(MongoHost, MongoPort0, rs = Type) ->
" w_mode = safe\n" " w_mode = safe\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"
"}", "}",
[Name, Servers] [
Name,
Servers,
QueryMode
]
), ),
{Name, parse_and_check(ConfigString, Type, Name)}; {Name, parse_and_check(ConfigString, Type, Name)};
mongo_config(MongoHost, MongoPort0, sharded = Type) -> mongo_config(MongoHost, MongoPort0, sharded = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0), MongoPort = integer_to_list(MongoPort0),
Servers = MongoHost ++ ":" ++ MongoPort, Servers = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
@ -173,13 +194,19 @@ mongo_config(MongoHost, MongoPort0, sharded = Type) ->
" w_mode = safe\n" " w_mode = safe\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"
"}", "}",
[Name, Servers] [
Name,
Servers,
QueryMode
]
), ),
{Name, parse_and_check(ConfigString, Type, Name)}; {Name, parse_and_check(ConfigString, Type, Name)};
mongo_config(MongoHost, MongoPort0, single = Type) -> mongo_config(MongoHost, MongoPort0, single = Type, Config) ->
QueryMode = ?config(query_mode, Config),
MongoPort = integer_to_list(MongoPort0), MongoPort = integer_to_list(MongoPort0),
Server = MongoHost ++ ":" ++ MongoPort, Server = MongoHost ++ ":" ++ MongoPort,
Name = atom_to_binary(?MODULE), Name = atom_to_binary(?MODULE),
@ -192,10 +219,15 @@ mongo_config(MongoHost, MongoPort0, single = Type) ->
" w_mode = safe\n" " w_mode = safe\n"
" database = mqtt\n" " database = mqtt\n"
" resource_opts = {\n" " resource_opts = {\n"
" query_mode = ~s\n"
" worker_pool_size = 1\n" " worker_pool_size = 1\n"
" }\n" " }\n"
"}", "}",
[Name, Server] [
Name,
Server,
QueryMode
]
), ),
{Name, parse_and_check(ConfigString, Type, Name)}. {Name, parse_and_check(ConfigString, Type, Name)}.
@ -248,7 +280,7 @@ find_all(Config) ->
Name = ?config(mongo_name, Config), Name = ?config(mongo_name, Config),
#{<<"collection">> := Collection} = ?config(mongo_config, Config), #{<<"collection">> := Collection} = ?config(mongo_config, Config),
ResourceID = emqx_bridge_resource:resource_id(Type, Name), ResourceID = emqx_bridge_resource:resource_id(Type, Name),
emqx_resource:query(ResourceID, {find, Collection, #{}, #{}}). emqx_resource:simple_sync_query(ResourceID, {find, Collection, #{}, #{}}).
send_message(Config, Payload) -> send_message(Config, Payload) ->
Name = ?config(mongo_name, Config), Name = ?config(mongo_name, Config),
@ -266,7 +298,12 @@ t_setup_via_config_and_publish(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
Val = erlang:unique_integer(), Val = erlang:unique_integer(),
ok = send_message(Config, #{key => Val}), {ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch( ?assertMatch(
{ok, [#{<<"key">> := Val}]}, {ok, [#{<<"key">> := Val}]},
find_all(Config) find_all(Config)
@ -286,7 +323,12 @@ t_setup_via_http_api_and_publish(Config) ->
create_bridge_http(MongoConfig) create_bridge_http(MongoConfig)
), ),
Val = erlang:unique_integer(), Val = erlang:unique_integer(),
ok = send_message(Config, #{key => Val}), {ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch( ?assertMatch(
{ok, [#{<<"key">> := Val}]}, {ok, [#{<<"key">> := Val}]},
find_all(Config) find_all(Config)
@ -297,7 +339,12 @@ t_payload_template(Config) ->
{ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}), {ok, _} = create_bridge(Config, #{<<"payload_template">> => <<"{\"foo\": \"${clientid}\"}">>}),
Val = erlang:unique_integer(), Val = erlang:unique_integer(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
ok = send_message(Config, #{key => Val, clientid => ClientId}), {ok, {ok, _}} =
?wait_async_action(
send_message(Config, #{key => Val, clientid => ClientId}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch( ?assertMatch(
{ok, [#{<<"foo">> := ClientId}]}, {ok, [#{<<"foo">> := ClientId}]},
find_all(Config) find_all(Config)
@ -314,11 +361,16 @@ t_collection_template(Config) ->
), ),
Val = erlang:unique_integer(), Val = erlang:unique_integer(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
ok = send_message(Config, #{ {ok, {ok, _}} =
key => Val, ?wait_async_action(
clientid => ClientId, send_message(Config, #{
mycollectionvar => <<"mycol">> key => Val,
}), clientid => ClientId,
mycollectionvar => <<"mycol">>
}),
#{?snk_kind := mongo_ee_connector_on_query_return},
5_000
),
?assertMatch( ?assertMatch(
{ok, [#{<<"foo">> := ClientId}]}, {ok, [#{<<"foo">> := ClientId}]},
find_all(Config) find_all(Config)

View File

@ -45,15 +45,16 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement], NonBatchCases = [t_write_timeout, t_uninitialized_prepared_statement],
BatchingGroups = [
{group, with_batch},
{group, without_batch}
],
QueryModeGroups = [{group, async}, {group, sync}],
[ [
{tcp, [ {tcp, QueryModeGroups},
{group, with_batch}, {tls, QueryModeGroups},
{group, without_batch} {async, BatchingGroups},
]}, {sync, BatchingGroups},
{tls, [
{group, with_batch},
{group, without_batch}
]},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs} {without_batch, TCs}
]. ].
@ -65,7 +66,6 @@ init_per_group(tcp, Config) ->
{mysql_host, MysqlHost}, {mysql_host, MysqlHost},
{mysql_port, MysqlPort}, {mysql_port, MysqlPort},
{enable_tls, false}, {enable_tls, false},
{query_mode, sync},
{proxy_name, "mysql_tcp"} {proxy_name, "mysql_tcp"}
| Config | Config
]; ];
@ -76,10 +76,13 @@ init_per_group(tls, Config) ->
{mysql_host, MysqlHost}, {mysql_host, MysqlHost},
{mysql_port, MysqlPort}, {mysql_port, MysqlPort},
{enable_tls, true}, {enable_tls, true},
{query_mode, sync},
{proxy_name, "mysql_tls"} {proxy_name, "mysql_tls"}
| Config | Config
]; ];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) -> init_per_group(with_batch, Config0) ->
Config = [{batch_size, 100} | Config0], Config = [{batch_size, 100} | Config0],
common_init(Config); common_init(Config);
@ -99,6 +102,7 @@ end_per_group(_Group, _Config) ->
ok. ok.
init_per_suite(Config) -> init_per_suite(Config) ->
emqx_common_test_helpers:clear_screen(),
Config. Config.
end_per_suite(_Config) -> end_per_suite(_Config) ->
@ -109,6 +113,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
@ -237,6 +242,25 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 500}). emqx_resource:query(ResourceID, Request, #{timeout => 500}).
query_resource_async(Config, Request) ->
Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
unprepare(Config, Key) -> unprepare(Config, Key) ->
Name = ?config(mysql_name, Config), Name = ?config(mysql_name, Config),
BridgeType = ?config(mysql_bridge_type, Config), BridgeType = ?config(mysql_bridge_type, Config),
@ -409,17 +433,29 @@ t_write_failure(Config) ->
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> begin
case QueryMode of %% for some unknown reason, `?wait_async_action' and `subscribe'
sync -> %% hang and timeout if called inside `with_failure', but the event
?assertMatch( %% happens and is emitted after the test pid dies!?
{error, {resource_error, #{reason := timeout}}}, {ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
2_000
),
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
case QueryMode of
sync ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
send_message(Config, SentData)
);
async ->
send_message(Config, SentData) send_message(Config, SentData)
); end,
async -> ?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
send_message(Config, SentData) ok
end end),
end), ok
end,
fun(Trace0) -> fun(Trace0) ->
ct:pal("trace: ~p", [Trace0]), ct:pal("trace: ~p", [Trace0]),
Trace = ?of_kind(buffer_worker_flush_nack, Trace0), Trace = ?of_kind(buffer_worker_flush_nack, Trace0),
@ -443,27 +479,52 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000, Timeout = 1000,
%% for some unknown reason, `?wait_async_action' and `subscribe'
%% hang and timeout if called inside `with_failure', but the event
%% happens and is emitted after the test pid dies!?
{ok, SRef} = snabbkaffe:subscribe(
?match_event(#{?snk_kind := buffer_worker_flush_nack}),
2 * Timeout
),
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch( case QueryMode of
{error, {resource_error, #{reason := timeout}}}, sync ->
query_resource(Config, {send_message, SentData, [], Timeout}) ?assertMatch(
) {error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData, [], Timeout})
);
async ->
query_resource(Config, {send_message, SentData, [], Timeout}),
ok
end,
ok
end), end),
?assertMatch({ok, [#{result := {error, _}}]}, snabbkaffe:receive_events(SRef)),
ok. ok.
t_simple_sql_query(Config) -> t_simple_sql_query(Config) ->
QueryMode = ?config(query_mode, Config),
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"SELECT count(1) AS T">>}, Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request), Result =
BatchSize = ?config(batch_size, Config), case QueryMode of
IsBatch = BatchSize > 1, sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case IsBatch of case IsBatch of
true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result); true -> ?assertEqual({error, {unrecoverable_error, batch_select_not_implemented}}, Result);
false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result) false -> ?assertEqual({ok, [<<"T">>], [[1]]}, Result)
@ -471,25 +532,37 @@ t_simple_sql_query(Config) ->
ok. ok.
t_missing_data(Config) -> t_missing_data(Config) ->
BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1,
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Result = send_message(Config, #{}), {ok, SRef} = snabbkaffe:subscribe(
BatchSize = ?config(batch_size, Config), ?match_event(#{?snk_kind := buffer_worker_flush_ack}),
IsBatch = BatchSize > 1, 2_000
),
send_message(Config, #{}),
{ok, [Event]} = snabbkaffe:receive_events(SRef),
case IsBatch of case IsBatch of
true -> true ->
?assertMatch( ?assertMatch(
{error, #{
{unrecoverable_error, result :=
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}, {error,
Result {unrecoverable_error,
{1292, _, <<"Truncated incorrect DOUBLE value: 'undefined'">>}}}
},
Event
); );
false -> false ->
?assertMatch( ?assertMatch(
{error, {unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}, #{
Result result :=
{error,
{unrecoverable_error, {1048, _, <<"Column 'arrived' cannot be null">>}}}
},
Event
) )
end, end,
ok. ok.
@ -500,14 +573,22 @@ t_bad_sql_parameter(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"">>, [bad_parameter]}, Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request), {_, {ok, Event}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
BatchSize = ?config(batch_size, Config), BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1, IsBatch = BatchSize > 1,
case IsBatch of case IsBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
false -> false ->
?assertEqual({error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}, Result) ?assertMatch(
#{result := {error, {unrecoverable_error, {invalid_params, [bad_parameter]}}}},
Event
)
end, end,
ok. ok.
@ -515,7 +596,12 @@ t_nasty_sql_string(Config) ->
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
Payload = list_to_binary(lists:seq(0, 255)), Payload = list_to_binary(lists:seq(0, 255)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
Result = send_message(Config, Message), {Result, {ok, _}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := mysql_connector_query_return},
1_000
),
?assertEqual(ok, Result), ?assertEqual(ok, Result),
?assertMatch( ?assertMatch(
{ok, [<<"payload">>], [[Payload]]}, {ok, [<<"payload">>], [[Payload]]},
@ -561,12 +647,22 @@ t_unprepared_statement_query(Config) ->
create_bridge(Config) create_bridge(Config)
), ),
Request = {prepared_query, unprepared_query, []}, Request = {prepared_query, unprepared_query, []},
Result = query_resource(Config, Request), {_, {ok, Event}} =
?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
BatchSize = ?config(batch_size, Config), BatchSize = ?config(batch_size, Config),
IsBatch = BatchSize > 1, IsBatch = BatchSize > 1,
case IsBatch of case IsBatch of
true -> ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); true ->
false -> ?assertEqual({error, {unrecoverable_error, prepared_statement_invalid}}, Result) ?assertMatch(#{result := {error, {unrecoverable_error, invalid_request}}}, Event);
false ->
?assertMatch(
#{result := {error, {unrecoverable_error, prepared_statement_invalid}}},
Event
)
end, end,
ok. ok.
@ -582,7 +678,13 @@ t_uninitialized_prepared_statement(Config) ->
unprepare(Config, send_message), unprepare(Config, send_message),
?check_trace( ?check_trace(
begin begin
?assertEqual(ok, send_message(Config, SentData)), {Res, {ok, _}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := mysql_connector_query_return},
2_000
),
?assertEqual(ok, Res),
ok ok
end, end,
fun(Trace) -> fun(Trace) ->

View File

@ -42,19 +42,18 @@ all() ->
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout], NonBatchCases = [t_write_timeout],
BatchVariantGroups = [
{group, with_batch},
{group, without_batch},
{group, matrix},
{group, timescale}
],
QueryModeGroups = [{async, BatchVariantGroups}, {sync, BatchVariantGroups}],
[ [
{tcp, [ {tcp, QueryModeGroups},
{group, with_batch}, {tls, QueryModeGroups},
{group, without_batch}, {async, BatchVariantGroups},
{group, matrix}, {sync, BatchVariantGroups},
{group, timescale}
]},
{tls, [
{group, with_batch},
{group, without_batch},
{group, matrix},
{group, timescale}
]},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs}, {without_batch, TCs},
{matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]}, {matrix, [t_setup_via_config_and_publish, t_setup_via_http_api_and_publish]},
@ -68,7 +67,6 @@ init_per_group(tcp, Config) ->
{pgsql_host, Host}, {pgsql_host, Host},
{pgsql_port, Port}, {pgsql_port, Port},
{enable_tls, false}, {enable_tls, false},
{query_mode, sync},
{proxy_name, "pgsql_tcp"} {proxy_name, "pgsql_tcp"}
| Config | Config
]; ];
@ -79,10 +77,13 @@ init_per_group(tls, Config) ->
{pgsql_host, Host}, {pgsql_host, Host},
{pgsql_port, Port}, {pgsql_port, Port},
{enable_tls, true}, {enable_tls, true},
{query_mode, sync},
{proxy_name, "pgsql_tls"} {proxy_name, "pgsql_tls"}
| Config | Config
]; ];
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) -> init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0], Config = [{enable_batch, true} | Config0],
common_init(Config); common_init(Config);
@ -118,6 +119,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
@ -221,9 +223,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config. Config.
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(pgsql_bridge_type, Config), BridgeType = ?config(pgsql_bridge_type, Config),
Name = ?config(pgsql_name, Config), Name = ?config(pgsql_name, Config),
PGConfig = ?config(pgsql_config, Config), PGConfig0 = ?config(pgsql_config, Config),
PGConfig = emqx_map_lib:deep_merge(PGConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, PGConfig). emqx_bridge:create(BridgeType, Name, PGConfig).
delete_bridge(Config) -> delete_bridge(Config) ->
@ -251,6 +257,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(pgsql_name, Config),
BridgeType = ?config(pgsql_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_pgsql(Config) -> connect_direct_pgsql(Config) ->
Opts = #{ Opts = #{
host => ?config(pgsql_host, Config), host => ?config(pgsql_host, Config),
@ -308,11 +335,12 @@ t_setup_via_config_and_publish(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
begin begin
?wait_async_action( {_, {ok, _}} =
?assertEqual({ok, 1}, send_message(Config, SentData)), ?wait_async_action(
#{?snk_kind := pgsql_connector_query_return}, send_message(Config, SentData),
10_000 #{?snk_kind := pgsql_connector_query_return},
), 10_000
),
?assertMatch( ?assertMatch(
Val, Val,
connect_and_get_payload(Config) connect_and_get_payload(Config)
@ -336,6 +364,7 @@ t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?config(pgsql_bridge_type, Config), BridgeType = ?config(pgsql_bridge_type, Config),
Name = ?config(pgsql_name, Config), Name = ?config(pgsql_name, Config),
PgsqlConfig0 = ?config(pgsql_config, Config), PgsqlConfig0 = ?config(pgsql_config, Config),
QueryMode = ?config(query_mode, Config),
PgsqlConfig = PgsqlConfig0#{ PgsqlConfig = PgsqlConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => BridgeType <<"type">> => BridgeType
@ -348,11 +377,18 @@ t_setup_via_http_api_and_publish(Config) ->
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
?check_trace( ?check_trace(
begin begin
?wait_async_action( {Res, {ok, _}} =
?assertEqual({ok, 1}, send_message(Config, SentData)), ?wait_async_action(
#{?snk_kind := pgsql_connector_query_return}, send_message(Config, SentData),
10_000 #{?snk_kind := pgsql_connector_query_return},
), 10_000
),
case QueryMode of
async ->
ok;
sync ->
?assertEqual({ok, 1}, Res)
end,
?assertMatch( ?assertMatch(
Val, Val,
connect_and_get_payload(Config) connect_and_get_payload(Config)
@ -457,28 +493,71 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config), QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
),
Val = integer_to_binary(erlang:unique_integer()), Val = integer_to_binary(erlang:unique_integer()),
SentData = #{payload => Val, timestamp => 1668602148000}, SentData = #{payload => Val, timestamp => 1668602148000},
Timeout = 1000, {ok, SRef} = snabbkaffe:subscribe(
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> ?match_event(#{?snk_kind := call_query_enter}),
?assertMatch( 2_000
{error, {resource_error, #{reason := timeout}}}, ),
query_resource(Config, {send_message, SentData, [], Timeout}) Res0 =
) emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
end), Res1 =
case QueryMode of
async ->
query_resource_async(Config, {send_message, SentData});
sync ->
query_resource(Config, {send_message, SentData})
end,
?assertMatch({ok, [_]}, snabbkaffe:receive_events(SRef)),
Res1
end),
case Res0 of
{_, Ref} when is_reference(Ref) ->
case receive_result(Ref, 15_000) of
{ok, Res} ->
?assertMatch({error, {unrecoverable_error, _}}, Res);
timeout ->
ct:pal("mailbox:\n ~p", [process_info(self(), messages)]),
ct:fail("no response received")
end;
_ ->
?assertMatch({error, {resource_error, #{reason := timeout}}}, Res0)
end,
ok. ok.
t_simple_sql_query(Config) -> t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
QueryMode = ?config(query_mode, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"SELECT count(1) AS T">>}, Request = {sql, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request), Result =
case ?config(enable_batch, Config) of case QueryMode of
true -> ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); sync ->
false -> ?assertMatch({ok, _, [{1}]}, Result) query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false ->
?assertMatch({ok, _, [{1}]}, Result)
end, end,
ok. ok.
@ -487,21 +566,40 @@ t_missing_data(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Result = send_message(Config, #{}), {_, {ok, Event}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
{error, {unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}, #{
Result result :=
{error,
{unrecoverable_error, {error, error, <<"23502">>, not_null_violation, _, _}}}
},
Event
), ),
ok. ok.
t_bad_sql_parameter(Config) -> t_bad_sql_parameter(Config) ->
QueryMode = ?config(query_mode, Config),
EnableBatch = ?config(enable_batch, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"">>, [bad_parameter]}, Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request), Result =
case ?config(enable_batch, Config) of case QueryMode of
sync ->
query_resource(Config, Request);
async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
case EnableBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> false ->
@ -515,5 +613,10 @@ t_nasty_sql_string(Config) ->
?assertMatch({ok, _}, create_bridge(Config)), ?assertMatch({ok, _}, create_bridge(Config)),
Payload = list_to_binary(lists:seq(1, 127)), Payload = list_to_binary(lists:seq(1, 127)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
?assertEqual({ok, 1}, send_message(Config, Message)), {_, {ok, _}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := pgsql_connector_query_return},
1_000
),
?assertEqual(Payload, connect_and_get_payload(Config)). ?assertEqual(Payload, connect_and_get_payload(Config)).

View File

@ -64,14 +64,17 @@ groups() ->
{group, batch_on}, {group, batch_on},
{group, batch_off} {group, batch_off}
], ],
QueryModeGroups = [{group, async}, {group, sync}],
[ [
{rest, TCs}, {rest, TCs},
{transports, [ {transports, [
{group, tcp}, {group, tcp},
{group, tls} {group, tls}
]}, ]},
{tcp, TypeGroups}, {tcp, QueryModeGroups},
{tls, TypeGroups}, {tls, QueryModeGroups},
{async, TypeGroups},
{sync, TypeGroups},
{redis_single, BatchGroups}, {redis_single, BatchGroups},
{redis_sentinel, BatchGroups}, {redis_sentinel, BatchGroups},
{redis_cluster, BatchGroups}, {redis_cluster, BatchGroups},
@ -79,6 +82,10 @@ groups() ->
{batch_off, ResourceSpecificTCs} {batch_off, ResourceSpecificTCs}
]. ].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(Group, Config) when init_per_group(Group, Config) when
Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster
-> ->
@ -149,8 +156,9 @@ init_per_testcase(_Testcase, Config) ->
{skip, "Batching is not supported by 'redis_cluster' bridge type"}; {skip, "Batching is not supported by 'redis_cluster' bridge type"};
{RedisType, BatchMode} -> {RedisType, BatchMode} ->
Transport = ?config(transport, Config), Transport = ?config(transport, Config),
QueryMode = ?config(query_mode, Config),
#{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(), #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(),
#{BatchMode := ResourceConfig} = resource_configs(), #{BatchMode := ResourceConfig} = resource_configs(#{query_mode => QueryMode}),
IsBatch = (BatchMode =:= batch_on), IsBatch = (BatchMode =:= batch_on),
BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS),
BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig},
@ -301,7 +309,7 @@ t_permanent_error(_Config) ->
?wait_async_action( ?wait_async_action(
publish_message(Topic, Payload), publish_message(Topic, Payload),
#{?snk_kind := redis_ee_connector_send_done}, #{?snk_kind := redis_ee_connector_send_done},
10000 10_000
) )
end, end,
fun(Trace) -> fun(Trace) ->
@ -529,14 +537,14 @@ invalid_command_bridge_config() ->
<<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>]
}. }.
resource_configs() -> resource_configs(#{query_mode := QueryMode}) ->
#{ #{
batch_off => #{ batch_off => #{
<<"query_mode">> => <<"sync">>, <<"query_mode">> => atom_to_binary(QueryMode),
<<"start_timeout">> => <<"15s">> <<"start_timeout">> => <<"15s">>
}, },
batch_on => #{ batch_on => #{
<<"query_mode">> => <<"sync">>, <<"query_mode">> => atom_to_binary(QueryMode),
<<"worker_pool_size">> => <<"1">>, <<"worker_pool_size">> => <<"1">>,
<<"batch_size">> => integer_to_binary(?BATCH_SIZE), <<"batch_size">> => integer_to_binary(?BATCH_SIZE),
<<"start_timeout">> => <<"15s">>, <<"start_timeout">> => <<"15s">>,

View File

@ -24,17 +24,24 @@
all() -> all() ->
[ [
{group, with_batch}, {group, async},
{group, without_batch} {group, sync}
]. ].
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[ [
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs}, {with_batch, TCs},
{without_batch, TCs} {without_batch, TCs}
]. ].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) -> init_per_group(with_batch, Config0) ->
Config = [{batch_size, ?BATCH_SIZE} | Config0], Config = [{batch_size, ?BATCH_SIZE} | Config0],
common_init(Config); common_init(Config);
@ -84,7 +91,6 @@ common_init(ConfigT) ->
Config0 = [ Config0 = [
{host, Host}, {host, Host},
{port, Port}, {port, Port},
{query_mode, sync},
{proxy_name, "rocketmq"} {proxy_name, "rocketmq"}
| ConfigT | ConfigT
], ],

View File

@ -46,18 +46,25 @@
all() -> all() ->
[ [
{group, with_batch}, {group, async},
{group, without_batch} {group, sync}
]. ].
groups() -> groups() ->
TCs = emqx_common_test_helpers:all(?MODULE), TCs = emqx_common_test_helpers:all(?MODULE),
NonBatchCases = [t_write_timeout], NonBatchCases = [t_write_timeout],
BatchingGroups = [{group, with_batch}, {group, without_batch}],
[ [
{async, BatchingGroups},
{sync, BatchingGroups},
{with_batch, TCs -- NonBatchCases}, {with_batch, TCs -- NonBatchCases},
{without_batch, TCs} {without_batch, TCs}
]. ].
init_per_group(async, Config) ->
[{query_mode, async} | Config];
init_per_group(sync, Config) ->
[{query_mode, sync} | Config];
init_per_group(with_batch, Config0) -> init_per_group(with_batch, Config0) ->
Config = [{enable_batch, true} | Config0], Config = [{enable_batch, true} | Config0],
common_init(Config); common_init(Config);
@ -87,6 +94,7 @@ end_per_suite(_Config) ->
init_per_testcase(_Testcase, Config) -> init_per_testcase(_Testcase, Config) ->
connect_and_clear_table(Config), connect_and_clear_table(Config),
delete_bridge(Config), delete_bridge(Config),
snabbkaffe:start_trace(),
Config. Config.
end_per_testcase(_Testcase, Config) -> end_per_testcase(_Testcase, Config) ->
@ -109,7 +117,6 @@ common_init(ConfigT) ->
Config0 = [ Config0 = [
{td_host, Host}, {td_host, Host},
{td_port, Port}, {td_port, Port},
{query_mode, sync},
{proxy_name, "tdengine_restful"} {proxy_name, "tdengine_restful"}
| ConfigT | ConfigT
], ],
@ -194,9 +201,13 @@ parse_and_check(ConfigString, BridgeType, Name) ->
Config. Config.
create_bridge(Config) -> create_bridge(Config) ->
create_bridge(Config, _Overrides = #{}).
create_bridge(Config, Overrides) ->
BridgeType = ?config(tdengine_bridge_type, Config), BridgeType = ?config(tdengine_bridge_type, Config),
Name = ?config(tdengine_name, Config), Name = ?config(tdengine_name, Config),
TDConfig = ?config(tdengine_config, Config), TDConfig0 = ?config(tdengine_config, Config),
TDConfig = emqx_map_lib:deep_merge(TDConfig0, Overrides),
emqx_bridge:create(BridgeType, Name, TDConfig). emqx_bridge:create(BridgeType, Name, TDConfig).
delete_bridge(Config) -> delete_bridge(Config) ->
@ -224,6 +235,27 @@ query_resource(Config, Request) ->
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name), ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
emqx_resource:query(ResourceID, Request, #{timeout => 1_000}). emqx_resource:query(ResourceID, Request, #{timeout => 1_000}).
query_resource_async(Config, Request) ->
Name = ?config(tdengine_name, Config),
BridgeType = ?config(tdengine_bridge_type, Config),
Ref = alias([reply]),
AsyncReplyFun = fun(Result) -> Ref ! {result, Ref, Result} end,
ResourceID = emqx_bridge_resource:resource_id(BridgeType, Name),
Return = emqx_resource:query(ResourceID, Request, #{
timeout => 500, async_reply_fun => {AsyncReplyFun, []}
}),
{Return, Ref}.
receive_result(Ref, Timeout) ->
receive
{result, Ref, Result} ->
{ok, Result};
{Ref, Result} ->
{ok, Result}
after Timeout ->
timeout
end.
connect_direct_tdengine(Config) -> connect_direct_tdengine(Config) ->
Opts = [ Opts = [
{host, to_bin(?config(td_host, Config))}, {host, to_bin(?config(td_host, Config))},
@ -273,12 +305,14 @@ t_setup_via_config_and_publish(Config) ->
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
?check_trace( ?check_trace(
begin begin
?wait_async_action( {_, {ok, #{result := Result}}} =
?assertMatch( ?wait_async_action(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData) send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
), ),
#{?snk_kind := tdengine_connector_query_return}, ?assertMatch(
10_000 {ok, #{<<"code">> := 0, <<"rows">> := 1}}, Result
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, ?PAYLOAD,
@ -297,24 +331,32 @@ t_setup_via_config_and_publish(Config) ->
t_setup_via_http_api_and_publish(Config) -> t_setup_via_http_api_and_publish(Config) ->
BridgeType = ?config(tdengine_bridge_type, Config), BridgeType = ?config(tdengine_bridge_type, Config),
Name = ?config(tdengine_name, Config), Name = ?config(tdengine_name, Config),
PgsqlConfig0 = ?config(tdengine_config, Config), QueryMode = ?config(query_mode, Config),
PgsqlConfig = PgsqlConfig0#{ TDengineConfig0 = ?config(tdengine_config, Config),
TDengineConfig = TDengineConfig0#{
<<"name">> => Name, <<"name">> => Name,
<<"type">> => BridgeType <<"type">> => BridgeType
}, },
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge_http(PgsqlConfig) create_bridge_http(TDengineConfig)
), ),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
?check_trace( ?check_trace(
begin begin
?wait_async_action( Request = {send_message, SentData},
?assertMatch( Res0 =
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, send_message(Config, SentData) case QueryMode of
), sync ->
#{?snk_kind := tdengine_connector_query_return}, query_resource(Config, Request);
10_000 async ->
{_, Ref} = query_resource_async(Config, Request),
{ok, Res} = receive_result(Ref, 2_000),
Res
end,
?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, Res0
), ),
?assertMatch( ?assertMatch(
?PAYLOAD, ?PAYLOAD,
@ -359,7 +401,14 @@ t_write_failure(Config) ->
{ok, _} = create_bridge(Config), {ok, _} = create_bridge(Config),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch({error, econnrefused}, send_message(Config, SentData)) {_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, SentData),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch({error, econnrefused}, Result),
ok
end), end),
ok. ok.
@ -369,24 +418,50 @@ t_write_timeout(Config) ->
ProxyName = ?config(proxy_name, Config), ProxyName = ?config(proxy_name, Config),
ProxyPort = ?config(proxy_port, Config), ProxyPort = ?config(proxy_port, Config),
ProxyHost = ?config(proxy_host, Config), ProxyHost = ?config(proxy_host, Config),
{ok, _} = create_bridge(Config), QueryMode = ?config(query_mode, Config),
{ok, _} = create_bridge(
Config,
#{
<<"resource_opts">> => #{
<<"request_timeout">> => 500,
<<"resume_interval">> => 100,
<<"health_check_interval">> => 100
}
}
),
SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000}, SentData = #{payload => ?PAYLOAD, timestamp => 1668602148000},
emqx_common_test_helpers:with_failure(timeout, ProxyName, ProxyHost, ProxyPort, fun() -> %% FIXME: TDengine connector hangs indefinetily during
?assertMatch( %% `call_query' while the connection is unresponsive. Should add
{error, {resource_error, #{reason := timeout}}}, %% a timeout to `APPLY_RESOURCE' in buffer worker??
query_resource(Config, {send_message, SentData}) case QueryMode of
) sync ->
end), emqx_common_test_helpers:with_failure(
timeout, ProxyName, ProxyHost, ProxyPort, fun() ->
?assertMatch(
{error, {resource_error, #{reason := timeout}}},
query_resource(Config, {send_message, SentData})
)
end
);
async ->
ct:comment("tdengine connector hangs the buffer worker forever")
end,
ok. ok.
t_simple_sql_query(Config) -> t_simple_sql_query(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {query, <<"SELECT count(1) AS T">>}, Request = {query, <<"SELECT count(1) AS T">>},
Result = query_resource(Config, Request), {_, {ok, #{result := Result}}} =
case ?config(enable_batch, Config) of ?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case EnableBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result); ?assertEqual({error, {unrecoverable_error, batch_prepare_not_implemented}}, Result);
false -> false ->
@ -399,7 +474,12 @@ t_missing_data(Config) ->
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Result = send_message(Config, #{}), {_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, #{}),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
{error, #{ {error, #{
<<"code">> := 534, <<"code">> := 534,
@ -410,13 +490,19 @@ t_missing_data(Config) ->
ok. ok.
t_bad_sql_parameter(Config) -> t_bad_sql_parameter(Config) ->
EnableBatch = ?config(enable_batch, Config),
?assertMatch( ?assertMatch(
{ok, _}, {ok, _},
create_bridge(Config) create_bridge(Config)
), ),
Request = {sql, <<"">>, [bad_parameter]}, Request = {sql, <<"">>, [bad_parameter]},
Result = query_resource(Config, Request), {_, {ok, #{result := Result}}} =
case ?config(enable_batch, Config) of ?wait_async_action(
query_resource(Config, Request),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
case EnableBatch of
true -> true ->
?assertEqual({error, {unrecoverable_error, invalid_request}}, Result); ?assertEqual({error, {unrecoverable_error, invalid_request}}, Result);
false -> false ->
@ -443,9 +529,15 @@ t_nasty_sql_string(Config) ->
% [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301 % [1]: https://github.com/taosdata/TDengine/blob/066cb34a/source/libs/parser/src/parUtil.c#L279-L301
Payload = list_to_binary(lists:seq(1, 127)), Payload = list_to_binary(lists:seq(1, 127)),
Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)}, Message = #{payload => Payload, timestamp => erlang:system_time(millisecond)},
{_, {ok, #{result := Result}}} =
?wait_async_action(
send_message(Config, Message),
#{?snk_kind := buffer_worker_flush_ack},
2_000
),
?assertMatch( ?assertMatch(
{ok, #{<<"code">> := 0, <<"rows">> := 1}}, {ok, #{<<"code">> := 0, <<"rows">> := 1}},
send_message(Config, Message) Result
), ),
?assertEqual( ?assertEqual(
Payload, Payload,

View File

@ -2,7 +2,7 @@
{deps, [ {deps, [
{hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}}, {hstreamdb_erl, {git, "https://github.com/hstreamdb/hstreamdb_erl.git", {tag, "0.2.5"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}}, {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.9"}}},
{tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.5"}}}, {tdengine, {git, "https://github.com/emqx/tdengine-client-erl", {tag, "0.1.6"}}},
{clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}}, {clickhouse, {git, "https://github.com/emqx/clickhouse-client-erl", {tag, "0.3"}}},
{erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}}, {erlcloud, {git, "https://github.com/emqx/erlcloud.git", {tag,"3.5.16-emqx-1"}}},
{rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.1"}}}, {rocketmq, {git, "https://github.com/emqx/rocketmq-client-erl.git", {tag, "v0.5.1"}}},

View File

@ -60,7 +60,9 @@ on_query(InstanceId, {send_message, Message0}, State) ->
collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0) collection => emqx_plugin_libs_rule:proc_tmpl(CollectionTemplate, Message0)
}, },
Message = render_message(PayloadTemplate, Message0), Message = render_message(PayloadTemplate, Message0),
emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState); Res = emqx_connector_mongo:on_query(InstanceId, {send_message, Message}, NewConnectorState),
?tp(mongo_ee_connector_on_query_return, #{result => Res}),
Res;
on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) -> on_query(InstanceId, Request, _State = #{connector_state := ConnectorState}) ->
emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState). emqx_connector_mongo:on_query(InstanceId, Request, ConnectorState).

View File

@ -93,7 +93,7 @@ defmodule EMQXUmbrella.MixProject do
github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true}, github: "ninenines/ranch", ref: "a692f44567034dacf5efcaa24a24183788594eb7", override: true},
# in conflict by grpc and eetcd # in conflict by grpc and eetcd
{:gpb, "4.19.5", override: true, runtime: false}, {:gpb, "4.19.5", override: true, runtime: false},
{:hackney, github: "benoitc/hackney", tag: "1.18.1", override: true} {:hackney, github: "emqx/hackney", tag: "1.18.1-1", override: true}
] ++ ] ++
emqx_apps(profile_info, version) ++ emqx_apps(profile_info, version) ++
enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep() enterprise_deps(profile_info) ++ bcrypt_dep() ++ jq_dep() ++ quicer_dep()

View File

@ -80,7 +80,7 @@
, {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}} , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.2.0"}}}
, {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}} , {jose, {git, "https://github.com/potatosalad/erlang-jose", {tag, "1.11.2"}}}
, {telemetry, "1.1.0"} , {telemetry, "1.1.0"}
, {hackney, {git, "https://github.com/benoitc/hackney", {tag, "1.18.1"}}} , {hackney, {git, "https://github.com/emqx/hackney.git", {tag, "1.18.1-1"}}}
]}. ]}.
{xref_ignores, {xref_ignores,

Some files were not shown because too many files have changed in this diff Show More