485 lines
16 KiB
Erlang
485 lines
16 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_couchbase_SUITE).
|
|
|
|
-feature(maybe_expr, enable).
|
|
|
|
-compile(nowarn_export_all).
|
|
-compile(export_all).
|
|
|
|
-elvis([{elvis_text_style, line_length, #{skip_comments => whole_line}}]).
|
|
|
|
%% -import(emqx_common_test_helpers, [on_exit/1]).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
-include_lib("common_test/include/ct.hrl").
|
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
|
-include("../src/emqx_bridge_couchbase.hrl").
|
|
|
|
-define(USERNAME, <<"admin">>).
|
|
-define(PASSWORD, <<"public">>).
|
|
-define(BUCKET, <<"mqtt">>).
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% CT boilerplate
|
|
%%------------------------------------------------------------------------------
|
|
|
|
all() ->
|
|
emqx_common_test_helpers:all(?MODULE).
|
|
|
|
init_per_suite(Config) ->
|
|
Host = os:getenv("COUCHBASE_HOST", "toxiproxy"),
|
|
DirectHost = os:getenv("COUCHBASE_DIRECT_HOST", "couchbase"),
|
|
Port = list_to_integer(os:getenv("COUCHBASE_PORT", "8093")),
|
|
AdminPort = list_to_integer(os:getenv("COUCHBASE_ADMIN_PORT", "8091")),
|
|
Server = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
|
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
|
ProxyName = "couchbase",
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
|
|
true ->
|
|
Apps = emqx_cth_suite:start(
|
|
[
|
|
emqx,
|
|
emqx_conf,
|
|
emqx_bridge_couchbase,
|
|
emqx_bridge,
|
|
emqx_rule_engine,
|
|
emqx_management,
|
|
emqx_mgmt_api_test_util:emqx_dashboard()
|
|
],
|
|
#{work_dir => emqx_cth_suite:work_dir(Config)}
|
|
),
|
|
[
|
|
{apps, Apps},
|
|
{proxy_name, ProxyName},
|
|
{proxy_host, ProxyHost},
|
|
{proxy_port, ProxyPort},
|
|
{server, Server},
|
|
{host, Host},
|
|
{direct_host, DirectHost},
|
|
{admin_port, AdminPort}
|
|
| Config
|
|
];
|
|
false ->
|
|
case os:getenv("IS_CI") of
|
|
"yes" ->
|
|
throw(no_couchbase);
|
|
_ ->
|
|
{skip, no_couchbase}
|
|
end
|
|
end.
|
|
|
|
end_per_suite(Config) ->
|
|
Apps = ?config(apps, Config),
|
|
emqx_cth_suite:stop(Apps),
|
|
ok.
|
|
|
|
init_per_testcase(TestCase, Config0) ->
|
|
ct:timetrap(timer:seconds(16)),
|
|
Server = ?config(server, Config0),
|
|
UniqueNum = integer_to_binary(erlang:unique_integer()),
|
|
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
|
|
ConnectorConfig = connector_config(Name, Server),
|
|
ActionConfig0 = action_config(Name, #{connector => Name}),
|
|
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0),
|
|
Config = [
|
|
{bridge_kind, action},
|
|
{action_type, ?ACTION_TYPE_BIN},
|
|
{action_name, Name},
|
|
{action_config, ActionConfig},
|
|
{connector_name, Name},
|
|
{connector_type, ?CONNECTOR_TYPE_BIN},
|
|
{connector_config, ConnectorConfig}
|
|
| Config0
|
|
],
|
|
start_admin_client(Config).
|
|
|
|
end_per_testcase(_Testcase, Config) ->
|
|
ProxyHost = ?config(proxy_host, Config),
|
|
ProxyPort = ?config(proxy_port, Config),
|
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
|
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
|
|
delete_scope(scope(), Config),
|
|
stop_admin_client(Config),
|
|
emqx_common_test_helpers:call_janitor(),
|
|
ok = snabbkaffe:stop(),
|
|
ok.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Helper fns
|
|
%%------------------------------------------------------------------------------
|
|
|
|
start_admin_client(Config) ->
|
|
DirectHost = ?config(direct_host, Config),
|
|
AdminPort = ?config(admin_port, Config),
|
|
AdminPool = <<"couchbase_SUITE_admin">>,
|
|
PoolOpts = [
|
|
{host, DirectHost},
|
|
{port, AdminPort},
|
|
{transport, tcp}
|
|
],
|
|
{ok, _} = ehttpc_sup:start_pool(AdminPool, PoolOpts),
|
|
[{admin_pool, AdminPool} | Config].
|
|
|
|
stop_admin_client(Config) ->
|
|
AdminPool = ?config(admin_pool, Config),
|
|
ok = ehttpc_sup:stop_pool(AdminPool),
|
|
ok.
|
|
|
|
connector_config(Name, Server) ->
|
|
InnerConfigMap0 =
|
|
#{
|
|
<<"enable">> => true,
|
|
<<"tags">> => [<<"bridge">>],
|
|
<<"description">> => <<"my cool bridge">>,
|
|
<<"server">> => Server,
|
|
<<"username">> => ?USERNAME,
|
|
<<"password">> => ?PASSWORD,
|
|
<<"ssl">> => #{<<"enable">> => false},
|
|
<<"resource_opts">> =>
|
|
#{
|
|
<<"health_check_interval">> => <<"1s">>,
|
|
<<"start_after_created">> => true,
|
|
<<"start_timeout">> => <<"5s">>
|
|
}
|
|
},
|
|
emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0).
|
|
|
|
action_config(Name, Overrides0) ->
|
|
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
|
|
CommonConfig =
|
|
#{
|
|
<<"enable">> => true,
|
|
<<"connector">> => <<"please override">>,
|
|
<<"parameters">> =>
|
|
#{
|
|
<<"sql">> => sql(Name),
|
|
<<"max_retries">> => 3
|
|
},
|
|
<<"resource_opts">> => #{
|
|
%% Batch is not yet supported
|
|
%% <<"batch_size">> => 1,
|
|
%% <<"batch_time">> => <<"0ms">>,
|
|
<<"buffer_mode">> => <<"memory_only">>,
|
|
<<"buffer_seg_bytes">> => <<"10MB">>,
|
|
<<"health_check_interval">> => <<"1s">>,
|
|
<<"inflight_window">> => 100,
|
|
<<"max_buffer_bytes">> => <<"256MB">>,
|
|
<<"metrics_flush_interval">> => <<"1s">>,
|
|
<<"query_mode">> => <<"sync">>,
|
|
<<"request_ttl">> => <<"15s">>,
|
|
<<"resume_interval">> => <<"1s">>,
|
|
<<"worker_pool_size">> => <<"1">>
|
|
}
|
|
},
|
|
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
|
|
|
|
auth_header() ->
|
|
BasicAuth = base64:encode(<<?USERNAME/binary, ":", ?PASSWORD/binary>>),
|
|
{<<"Authorization">>, [<<"Basic ">>, BasicAuth]}.
|
|
|
|
ensure_scope(Scope, Config) ->
|
|
case get_scope(Scope, Config) of
|
|
{ok, _} ->
|
|
ct:pal("scope ~s already exists", [Scope]),
|
|
ok;
|
|
undefined ->
|
|
ct:pal("creating scope ~s", [Scope]),
|
|
{200, _} = create_scope(Scope, Config),
|
|
ok
|
|
end.
|
|
|
|
ensure_collection(Scope, Collection, Opts, Config) ->
|
|
case get_collection(Scope, Collection, Config) of
|
|
{ok, _} ->
|
|
ct:pal("collection ~s.~s already exists", [Scope, Collection]),
|
|
ok;
|
|
undefined ->
|
|
ct:pal("creating collection ~s.~s", [Scope, Collection]),
|
|
{200, _} = create_collection(Scope, Collection, Opts, Config),
|
|
ok
|
|
end.
|
|
|
|
create_scope(Scope, Config) ->
|
|
AdminPool = ?config(admin_pool, Config),
|
|
ReqBody = [<<"name=">>, Scope],
|
|
Request = {
|
|
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
|
|
[
|
|
auth_header(),
|
|
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
|
|
],
|
|
ReqBody
|
|
},
|
|
RequestTTL = timer:seconds(5),
|
|
MaxRetries = 3,
|
|
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
|
|
AdminPool, post, Request, RequestTTL, MaxRetries
|
|
),
|
|
Body = maybe_decode_json(Body0),
|
|
ct:pal("create scope response:\n ~p", [{StatusCode, Body}]),
|
|
{StatusCode, Body}.
|
|
|
|
create_collection(Scope, Collection, _Opts, Config) ->
|
|
AdminPool = ?config(admin_pool, Config),
|
|
ReqBody = [<<"name=">>, Collection],
|
|
Request = {
|
|
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope, <<"/collections">>],
|
|
[
|
|
auth_header(),
|
|
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
|
|
],
|
|
ReqBody
|
|
},
|
|
RequestTTL = timer:seconds(5),
|
|
MaxRetries = 3,
|
|
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
|
|
AdminPool, post, Request, RequestTTL, MaxRetries
|
|
),
|
|
Body = maybe_decode_json(Body0),
|
|
ct:pal("create collection response:\n ~p", [{StatusCode, Body}]),
|
|
{StatusCode, Body}.
|
|
|
|
delete_scope(Scope, Config) ->
|
|
AdminPool = ?config(admin_pool, Config),
|
|
Request = {
|
|
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope],
|
|
[auth_header()]
|
|
},
|
|
RequestTTL = timer:seconds(5),
|
|
MaxRetries = 3,
|
|
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
|
|
AdminPool, delete, Request, RequestTTL, MaxRetries
|
|
),
|
|
Body = maybe_decode_json(Body0),
|
|
ct:pal("delete scope response:\n ~p", [{StatusCode, Body}]),
|
|
{StatusCode, Body}.
|
|
|
|
get_scopes(Config) ->
|
|
AdminPool = ?config(admin_pool, Config),
|
|
Request = {
|
|
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
|
|
[auth_header()]
|
|
},
|
|
RequestTTL = timer:seconds(5),
|
|
MaxRetries = 3,
|
|
{ok, 200, _Headers, Body0} = ehttpc:request(AdminPool, get, Request, RequestTTL, MaxRetries),
|
|
Body = maybe_decode_json(Body0),
|
|
ct:pal("get scopes response:\n ~p", [Body]),
|
|
Body.
|
|
|
|
get_scope(Scope, Config) ->
|
|
#{<<"scopes">> := Scopes} = get_scopes(Config),
|
|
fetch_with_name(Scopes, Scope).
|
|
|
|
get_collections(Scope, Config) ->
|
|
maybe
|
|
{ok, #{<<"collections">> := Cs}} = get_scope(Scope, Config),
|
|
{ok, Cs}
|
|
end.
|
|
|
|
get_collection(Scope, Collection, Config) ->
|
|
maybe
|
|
{ok, Cs} = get_collections(Scope, Config),
|
|
fetch_with_name(Cs, Collection)
|
|
end.
|
|
|
|
fetch_with_name(Xs, Name) ->
|
|
case [X || X = #{<<"name">> := N} <- Xs, N =:= Name] of
|
|
[] ->
|
|
undefined;
|
|
[X] ->
|
|
{ok, X}
|
|
end.
|
|
|
|
maybe_decode_json(Body) ->
|
|
case emqx_utils_json:safe_decode(Body, [return_maps]) of
|
|
{ok, JSON} ->
|
|
JSON;
|
|
{error, _} ->
|
|
Body
|
|
end.
|
|
|
|
%% Collection creation is async... Trying to insert or select from a recently created
|
|
%% collection might result in error 12003, "Keyspace not found in CB datastore".
|
|
%% https://www.couchbase.com/forums/t/error-creating-primary-index-immediately-after-collection-creation-keyspace-not-found-in-cb-datastore/32479
|
|
wait_until_collection_is_ready(Scope, Collection, Config) ->
|
|
wait_until_collection_is_ready(Scope, Collection, Config, _N = 5, _SleepMS = 200).
|
|
|
|
wait_until_collection_is_ready(Scope, Collection, _Config, N, _SleepMS) when N < 0 ->
|
|
error({collection_not_ready_timeout, Scope, Collection});
|
|
wait_until_collection_is_ready(Scope, Collection, Config, N, SleepMS) ->
|
|
case get_all_rows(Scope, Collection, Config) of
|
|
{ok, _} ->
|
|
ct:pal("collection ~s.~s ready", [Scope, Collection]),
|
|
ok;
|
|
Resp ->
|
|
ct:pal("waiting for collection ~s.~s error response:\n ~p", [Scope, Collection, Resp]),
|
|
ct:sleep(SleepMS),
|
|
wait_until_collection_is_ready(Scope, Collection, Config, N - 1, SleepMS)
|
|
end.
|
|
|
|
scope() ->
|
|
<<"some_scope">>.
|
|
|
|
sql(Name) ->
|
|
Scope = scope(),
|
|
iolist_to_binary([
|
|
<<"insert into default:mqtt.">>,
|
|
Scope,
|
|
<<".">>,
|
|
<<"`">>,
|
|
Name,
|
|
<<"`">>,
|
|
<<" (key, value) values (${.id}, ${.})">>
|
|
]).
|
|
|
|
get_all_rows(Scope, Collection, Config) ->
|
|
ConnResId = emqx_bridge_v2_testlib:connector_resource_id(Config),
|
|
SQL = iolist_to_binary([
|
|
<<"select * from default:mqtt.">>,
|
|
Scope,
|
|
<<".">>,
|
|
<<"`">>,
|
|
Collection,
|
|
<<"`">>
|
|
]),
|
|
Opts = #{},
|
|
Resp = emqx_bridge_couchbase_connector:query(ConnResId, SQL, Opts),
|
|
ct:pal("get rows response:\n ~p", [Resp]),
|
|
case Resp of
|
|
{ok, #{
|
|
status_code := 200, body := #{<<"status">> := <<"success">>, <<"results">> := Rows0}
|
|
}} ->
|
|
Rows = lists:map(
|
|
fun(#{Collection := Value}) ->
|
|
maybe_decode_json(Value)
|
|
end,
|
|
Rows0
|
|
),
|
|
{ok, Rows};
|
|
{error, _} ->
|
|
Resp
|
|
end.
|
|
|
|
proplist_update(Proplist, K, Fn) ->
|
|
{K, OldV} = lists:keyfind(K, 1, Proplist),
|
|
NewV = Fn(OldV),
|
|
lists:keystore(K, 1, Proplist, {K, NewV}).
|
|
|
|
pre_publish_fn(Scope, Collection, Config) ->
|
|
fun(Context) ->
|
|
ensure_scope(Scope, Config),
|
|
ensure_collection(Scope, Collection, _Opts = #{}, Config),
|
|
wait_until_collection_is_ready(Scope, Collection, Config),
|
|
Context
|
|
end.
|
|
|
|
%%------------------------------------------------------------------------------
|
|
%% Testcases
|
|
%%------------------------------------------------------------------------------
|
|
|
|
t_start_stop(Config) ->
|
|
ok = emqx_bridge_v2_testlib:t_start_stop(Config, "couchbase_connector_stop"),
|
|
ok.
|
|
|
|
t_create_via_http(Config) ->
|
|
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
|
|
ok.
|
|
|
|
t_on_get_status(Config) ->
|
|
ok = emqx_bridge_v2_testlib:t_on_get_status(Config),
|
|
ok.
|
|
|
|
t_rule_action(Config) ->
|
|
Scope = scope(),
|
|
Collection = ?config(action_name, Config),
|
|
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
|
|
PostPublishFn = fun(#{payload := Payload} = Context) ->
|
|
%% need to retry because things are async in couchbase
|
|
?retry(
|
|
100,
|
|
10,
|
|
?assertMatch(
|
|
{ok, [#{<<"payload">> := Payload}]},
|
|
get_all_rows(Scope, Collection, Config)
|
|
)
|
|
),
|
|
Context
|
|
end,
|
|
Opts = #{pre_publish_fn => PrePublishFn, post_publish_fn => PostPublishFn},
|
|
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
|
|
ok.
|
|
|
|
%% batch is not yet supported
|
|
skip_t_rule_action_batch(Config0) ->
|
|
Config = proplist_update(Config0, action_config, fun(ActionConfig) ->
|
|
emqx_utils_maps:deep_merge(
|
|
ActionConfig,
|
|
#{
|
|
<<"resource_opts">> => #{
|
|
<<"batch_size">> => 10,
|
|
<<"batch_time">> => <<"100ms">>
|
|
}
|
|
}
|
|
)
|
|
end),
|
|
Scope = scope(),
|
|
Collection = ?config(action_name, Config),
|
|
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
|
|
PublishFn = fun(#{rule_topic := RuleTopic, payload_fn := PayloadFn} = Context) ->
|
|
Payloads = emqx_utils:pmap(
|
|
fun(_) ->
|
|
Payload = PayloadFn(),
|
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
|
{ok, C} = emqtt:start_link(#{clean_start => true, clientid => ClientId}),
|
|
{ok, _} = emqtt:connect(C),
|
|
?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])),
|
|
Payload
|
|
end,
|
|
lists:seq(1, 10)
|
|
),
|
|
Context#{payloads => Payloads}
|
|
end,
|
|
PostPublishFn = fun(#{payloads := _Payloads} = Context) ->
|
|
%% need to retry because things are async in couchbase
|
|
?retry(
|
|
200,
|
|
10,
|
|
?assertMatch(
|
|
{ok, [#{<<"payload">> := todo}]},
|
|
get_all_rows(Scope, Collection, Config)
|
|
)
|
|
),
|
|
Context
|
|
end,
|
|
Opts = #{
|
|
pre_publish_fn => PrePublishFn,
|
|
publish_fn => PublishFn,
|
|
post_publish_fn => PostPublishFn
|
|
},
|
|
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
|
|
ok.
|
|
|
|
t_sync_query_down(Config) ->
|
|
Scope = scope(),
|
|
Collection = ?config(action_name, Config),
|
|
MakeMsgFn = fun(RuleTopic) ->
|
|
ensure_scope(Scope, Config),
|
|
ensure_collection(Scope, Collection, _Opts = #{}, Config),
|
|
wait_until_collection_is_ready(Scope, Collection, Config),
|
|
emqx_message:make(RuleTopic, <<"hi">>)
|
|
end,
|
|
Opts = #{
|
|
make_message_fn => MakeMsgFn,
|
|
enter_tp_filter => ?match_event(#{?snk_kind := "couchbase_on_query_enter"}),
|
|
error_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_error"}),
|
|
success_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_success"})
|
|
},
|
|
emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
|
|
ok.
|