emqx/apps/emqx_bridge_couchbase/test/emqx_bridge_couchbase_SUITE...

485 lines
16 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_SUITE).
-feature(maybe_expr, enable).
-compile(nowarn_export_all).
-compile(export_all).
-elvis([{elvis_text_style, line_length, #{skip_comments => whole_line}}]).
%% -import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("../src/emqx_bridge_couchbase.hrl").
-define(USERNAME, <<"admin">>).
-define(PASSWORD, <<"public">>).
-define(BUCKET, <<"mqtt">>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Host = os:getenv("COUCHBASE_HOST", "toxiproxy"),
DirectHost = os:getenv("COUCHBASE_DIRECT_HOST", "couchbase"),
Port = list_to_integer(os:getenv("COUCHBASE_PORT", "8093")),
AdminPort = list_to_integer(os:getenv("COUCHBASE_ADMIN_PORT", "8091")),
Server = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "couchbase",
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_bridge_couchbase,
emqx_bridge,
emqx_rule_engine,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[
{apps, Apps},
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{server, Server},
{host, Host},
{direct_host, DirectHost},
{admin_port, AdminPort}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_couchbase);
_ ->
{skip, no_couchbase}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(16)),
Server = ?config(server, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
ConnectorConfig = connector_config(Name, Server),
ActionConfig0 = action_config(Name, #{connector => Name}),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0),
Config = [
{bridge_kind, action},
{action_type, ?ACTION_TYPE_BIN},
{action_name, Name},
{action_config, ActionConfig},
{connector_name, Name},
{connector_type, ?CONNECTOR_TYPE_BIN},
{connector_config, ConnectorConfig}
| Config0
],
start_admin_client(Config).
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
delete_scope(scope(), Config),
stop_admin_client(Config),
emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
start_admin_client(Config) ->
DirectHost = ?config(direct_host, Config),
AdminPort = ?config(admin_port, Config),
AdminPool = <<"couchbase_SUITE_admin">>,
PoolOpts = [
{host, DirectHost},
{port, AdminPort},
{transport, tcp}
],
{ok, _} = ehttpc_sup:start_pool(AdminPool, PoolOpts),
[{admin_pool, AdminPool} | Config].
stop_admin_client(Config) ->
AdminPool = ?config(admin_pool, Config),
ok = ehttpc_sup:stop_pool(AdminPool),
ok.
connector_config(Name, Server) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"tags">> => [<<"bridge">>],
<<"description">> => <<"my cool bridge">>,
<<"server">> => Server,
<<"username">> => ?USERNAME,
<<"password">> => ?PASSWORD,
<<"ssl">> => #{<<"enable">> => false},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0).
action_config(Name, Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"sql">> => sql(Name),
<<"max_retries">> => 3
},
<<"resource_opts">> => #{
%% Batch is not yet supported
%% <<"batch_size">> => 1,
%% <<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
auth_header() ->
BasicAuth = base64:encode(<<?USERNAME/binary, ":", ?PASSWORD/binary>>),
{<<"Authorization">>, [<<"Basic ">>, BasicAuth]}.
ensure_scope(Scope, Config) ->
case get_scope(Scope, Config) of
{ok, _} ->
ct:pal("scope ~s already exists", [Scope]),
ok;
undefined ->
ct:pal("creating scope ~s", [Scope]),
{200, _} = create_scope(Scope, Config),
ok
end.
ensure_collection(Scope, Collection, Opts, Config) ->
case get_collection(Scope, Collection, Config) of
{ok, _} ->
ct:pal("collection ~s.~s already exists", [Scope, Collection]),
ok;
undefined ->
ct:pal("creating collection ~s.~s", [Scope, Collection]),
{200, _} = create_collection(Scope, Collection, Opts, Config),
ok
end.
create_scope(Scope, Config) ->
AdminPool = ?config(admin_pool, Config),
ReqBody = [<<"name=">>, Scope],
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
[
auth_header(),
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
],
ReqBody
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, post, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("create scope response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
create_collection(Scope, Collection, _Opts, Config) ->
AdminPool = ?config(admin_pool, Config),
ReqBody = [<<"name=">>, Collection],
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope, <<"/collections">>],
[
auth_header(),
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
],
ReqBody
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, post, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("create collection response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
delete_scope(Scope, Config) ->
AdminPool = ?config(admin_pool, Config),
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope],
[auth_header()]
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, delete, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("delete scope response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
get_scopes(Config) ->
AdminPool = ?config(admin_pool, Config),
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
[auth_header()]
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, 200, _Headers, Body0} = ehttpc:request(AdminPool, get, Request, RequestTTL, MaxRetries),
Body = maybe_decode_json(Body0),
ct:pal("get scopes response:\n ~p", [Body]),
Body.
get_scope(Scope, Config) ->
#{<<"scopes">> := Scopes} = get_scopes(Config),
fetch_with_name(Scopes, Scope).
get_collections(Scope, Config) ->
maybe
{ok, #{<<"collections">> := Cs}} = get_scope(Scope, Config),
{ok, Cs}
end.
get_collection(Scope, Collection, Config) ->
maybe
{ok, Cs} = get_collections(Scope, Config),
fetch_with_name(Cs, Collection)
end.
fetch_with_name(Xs, Name) ->
case [X || X = #{<<"name">> := N} <- Xs, N =:= Name] of
[] ->
undefined;
[X] ->
{ok, X}
end.
maybe_decode_json(Body) ->
case emqx_utils_json:safe_decode(Body, [return_maps]) of
{ok, JSON} ->
JSON;
{error, _} ->
Body
end.
%% Collection creation is async... Trying to insert or select from a recently created
%% collection might result in error 12003, "Keyspace not found in CB datastore".
%% https://www.couchbase.com/forums/t/error-creating-primary-index-immediately-after-collection-creation-keyspace-not-found-in-cb-datastore/32479
wait_until_collection_is_ready(Scope, Collection, Config) ->
wait_until_collection_is_ready(Scope, Collection, Config, _N = 5, _SleepMS = 200).
wait_until_collection_is_ready(Scope, Collection, _Config, N, _SleepMS) when N < 0 ->
error({collection_not_ready_timeout, Scope, Collection});
wait_until_collection_is_ready(Scope, Collection, Config, N, SleepMS) ->
case get_all_rows(Scope, Collection, Config) of
{ok, _} ->
ct:pal("collection ~s.~s ready", [Scope, Collection]),
ok;
Resp ->
ct:pal("waiting for collection ~s.~s error response:\n ~p", [Scope, Collection, Resp]),
ct:sleep(SleepMS),
wait_until_collection_is_ready(Scope, Collection, Config, N - 1, SleepMS)
end.
scope() ->
<<"some_scope">>.
sql(Name) ->
Scope = scope(),
iolist_to_binary([
<<"insert into default:mqtt.">>,
Scope,
<<".">>,
<<"`">>,
Name,
<<"`">>,
<<" (key, value) values (${.id}, ${.})">>
]).
get_all_rows(Scope, Collection, Config) ->
ConnResId = emqx_bridge_v2_testlib:connector_resource_id(Config),
SQL = iolist_to_binary([
<<"select * from default:mqtt.">>,
Scope,
<<".">>,
<<"`">>,
Collection,
<<"`">>
]),
Opts = #{},
Resp = emqx_bridge_couchbase_connector:query(ConnResId, SQL, Opts),
ct:pal("get rows response:\n ~p", [Resp]),
case Resp of
{ok, #{
status_code := 200, body := #{<<"status">> := <<"success">>, <<"results">> := Rows0}
}} ->
Rows = lists:map(
fun(#{Collection := Value}) ->
maybe_decode_json(Value)
end,
Rows0
),
{ok, Rows};
{error, _} ->
Resp
end.
proplist_update(Proplist, K, Fn) ->
{K, OldV} = lists:keyfind(K, 1, Proplist),
NewV = Fn(OldV),
lists:keystore(K, 1, Proplist, {K, NewV}).
pre_publish_fn(Scope, Collection, Config) ->
fun(Context) ->
ensure_scope(Scope, Config),
ensure_collection(Scope, Collection, _Opts = #{}, Config),
wait_until_collection_is_ready(Scope, Collection, Config),
Context
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, "couchbase_connector_stop"),
ok.
t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
ok = emqx_bridge_v2_testlib:t_on_get_status(Config),
ok.
t_rule_action(Config) ->
Scope = scope(),
Collection = ?config(action_name, Config),
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
PostPublishFn = fun(#{payload := Payload} = Context) ->
%% need to retry because things are async in couchbase
?retry(
100,
10,
?assertMatch(
{ok, [#{<<"payload">> := Payload}]},
get_all_rows(Scope, Collection, Config)
)
),
Context
end,
Opts = #{pre_publish_fn => PrePublishFn, post_publish_fn => PostPublishFn},
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
ok.
%% batch is not yet supported
skip_t_rule_action_batch(Config0) ->
Config = proplist_update(Config0, action_config, fun(ActionConfig) ->
emqx_utils_maps:deep_merge(
ActionConfig,
#{
<<"resource_opts">> => #{
<<"batch_size">> => 10,
<<"batch_time">> => <<"100ms">>
}
}
)
end),
Scope = scope(),
Collection = ?config(action_name, Config),
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
PublishFn = fun(#{rule_topic := RuleTopic, payload_fn := PayloadFn} = Context) ->
Payloads = emqx_utils:pmap(
fun(_) ->
Payload = PayloadFn(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
{ok, C} = emqtt:start_link(#{clean_start => true, clientid => ClientId}),
{ok, _} = emqtt:connect(C),
?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])),
Payload
end,
lists:seq(1, 10)
),
Context#{payloads => Payloads}
end,
PostPublishFn = fun(#{payloads := _Payloads} = Context) ->
%% need to retry because things are async in couchbase
?retry(
200,
10,
?assertMatch(
{ok, [#{<<"payload">> := todo}]},
get_all_rows(Scope, Collection, Config)
)
),
Context
end,
Opts = #{
pre_publish_fn => PrePublishFn,
publish_fn => PublishFn,
post_publish_fn => PostPublishFn
},
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
ok.
t_sync_query_down(Config) ->
Scope = scope(),
Collection = ?config(action_name, Config),
MakeMsgFn = fun(RuleTopic) ->
ensure_scope(Scope, Config),
ensure_collection(Scope, Collection, _Opts = #{}, Config),
wait_until_collection_is_ready(Scope, Collection, Config),
emqx_message:make(RuleTopic, <<"hi">>)
end,
Opts = #{
make_message_fn => MakeMsgFn,
enter_tp_filter => ?match_event(#{?snk_kind := "couchbase_on_query_enter"}),
error_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_error"}),
success_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_success"})
},
emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
ok.