Merge pull request #13415 from thalesmg/20240703-m-couchbase-action

feat: implement couchbase connector and action
This commit is contained in:
Thales Macedo Garitezi 2024-07-09 15:53:11 -03:00 committed by GitHub
commit 6d94809950
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 1672 additions and 43 deletions

View File

@ -0,0 +1,30 @@
version: '3.9'
services:
couchbase:
container_name: couchbase
hostname: couchbase
image: ghcr.io/emqx/couchbase:1.0.0
restart: always
expose:
- 8091-8093
# ports:
# - "8091-8093:8091-8093"
networks:
- emqx_bridge
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8093/admin/ping"]
interval: 30s
timeout: 5s
retries: 4
environment:
- CLUSTER=localhost
- USER=admin
- PASS=public
- PORT=8091
- RAMSIZEMB=2048
- RAMSIZEINDEXMB=512
- RAMSIZEFTSMB=512
- BUCKETS=mqtt
- BUCKETSIZES=100
- AUTOREBALANCE=true

View File

@ -221,5 +221,11 @@
"listen": "0.0.0.0:10000", "listen": "0.0.0.0:10000",
"upstream": "azurite:10000", "upstream": "azurite:10000",
"enabled": true "enabled": true
},
{
"name": "couchbase",
"listen": "0.0.0.0:8093",
"upstream": "couchbase:8093",
"enabled": true
} }
] ]

View File

@ -89,37 +89,38 @@
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->
[ [
emqx_bridge_azure_event_hub_action_info,
emqx_bridge_azure_blob_storage_action_info, emqx_bridge_azure_blob_storage_action_info,
emqx_bridge_confluent_producer_action_info, emqx_bridge_azure_event_hub_action_info,
emqx_bridge_dynamo_action_info,
emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_kafka_producer_action_info,
emqx_bridge_kafka_consumer_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_oracle_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_cassandra_action_info, emqx_bridge_cassandra_action_info,
emqx_bridge_clickhouse_action_info, emqx_bridge_clickhouse_action_info,
emqx_bridge_mysql_action_info, emqx_bridge_confluent_producer_action_info,
emqx_bridge_pgsql_action_info, emqx_bridge_couchbase_action_info,
emqx_bridge_syskeeper_action_info, emqx_bridge_dynamo_action_info,
emqx_bridge_sqlserver_action_info,
emqx_bridge_timescale_action_info,
emqx_bridge_redis_action_info,
emqx_bridge_iotdb_action_info,
emqx_bridge_es_action_info, emqx_bridge_es_action_info,
emqx_bridge_opents_action_info, emqx_bridge_gcp_pubsub_consumer_action_info,
emqx_bridge_rabbitmq_action_info, emqx_bridge_gcp_pubsub_producer_action_info,
emqx_bridge_pulsar_action_info,
emqx_bridge_greptimedb_action_info, emqx_bridge_greptimedb_action_info,
emqx_bridge_hstreamdb_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_iotdb_action_info,
emqx_bridge_kafka_consumer_action_info,
emqx_bridge_kafka_producer_action_info,
emqx_bridge_kinesis_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_mysql_action_info,
emqx_bridge_opents_action_info,
emqx_bridge_oracle_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_pulsar_action_info,
emqx_bridge_rabbitmq_action_info,
emqx_bridge_redis_action_info,
emqx_bridge_rocketmq_action_info,
emqx_bridge_s3_upload_action_info,
emqx_bridge_sqlserver_action_info,
emqx_bridge_syskeeper_action_info,
emqx_bridge_tdengine_action_info, emqx_bridge_tdengine_action_info,
emqx_bridge_s3_upload_action_info emqx_bridge_timescale_action_info
]. ].
-else. -else.
hard_coded_action_info_modules_ee() -> hard_coded_action_info_modules_ee() ->

View File

@ -807,6 +807,50 @@ t_async_query(Config, MakeMessageFun, IsSuccessCheck, TracePoint) ->
end, end,
ok. ok.
t_rule_action(TCConfig) ->
t_rule_action(TCConfig, _Opts = #{}).
%% Similar to `t_sync_query', but using only API functions and rule actions to trigger the
%% bridge, instead of lower level functions.
t_rule_action(TCConfig, Opts) ->
TraceCheckers = maps:get(trace_checkers, Opts, []),
PostPublishFn = maps:get(post_publish_fn, Opts, fun(Context) -> Context end),
PrePublishFn = maps:get(pre_publish_fn, Opts, fun(Context) -> Context end),
PayloadFn = maps:get(payload_fn, Opts, fun() -> emqx_guid:to_hexstr(emqx_guid:gen()) end),
PublishFn = maps:get(
publish_fn,
Opts,
fun(#{rule_topic := RuleTopic, payload_fn := PayloadFnIn} = Context) ->
Payload = PayloadFnIn(),
{ok, C} = emqtt:start_link(#{clean_start => true}),
{ok, _} = emqtt:connect(C),
?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])),
ok = emqtt:stop(C),
Context#{payload => Payload}
end
),
?check_trace(
begin
#{type := Type} = get_common_values(TCConfig),
?assertMatch({ok, _}, create_bridge_api(TCConfig)),
RuleTopic = emqx_topic:join([<<"test">>, emqx_utils_conv:bin(Type)]),
{ok, _} = create_rule_and_action_http(Type, RuleTopic, TCConfig),
ResourceId = resource_id(TCConfig),
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
),
Context0 = #{rule_topic => RuleTopic, payload_fn => PayloadFn},
Context1 = PrePublishFn(Context0),
Context2 = PublishFn(Context1),
PostPublishFn(Context2),
ok
end,
TraceCheckers
),
ok.
%% Like `t_sync_query', but we send the message while the connector is %% Like `t_sync_query', but we send the message while the connector is
%% `?status_disconnected' and test that, after recovery, the buffered message eventually %% `?status_disconnected' and test that, after recovery, the buffered message eventually
%% is sent. %% is sent.

View File

@ -92,7 +92,7 @@ init_per_testcase(TestCase, Config0) ->
ConnectorConfig = connector_config(Name, Endpoint), ConnectorConfig = connector_config(Name, Endpoint),
ContainerName = container_name(Name), ContainerName = container_name(Name),
%% TODO: switch based on test %% TODO: switch based on test
ActionConfig = ActionConfig0 =
case lists:member(TestCase, direct_action_cases()) of case lists:member(TestCase, direct_action_cases()) of
true -> true ->
direct_action_config(#{ direct_action_config(#{
@ -105,6 +105,7 @@ init_per_testcase(TestCase, Config0) ->
parameters => #{container => ContainerName} parameters => #{container => ContainerName}
}) })
end, end,
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0),
Client = new_control_driver(Endpoint), Client = new_control_driver(Endpoint),
ct:pal("container name: ~s", [ContainerName]), ct:pal("container name: ~s", [ContainerName]),
ok = ensure_new_container(ContainerName, Client), ok = ensure_new_container(ContainerName, Client),
@ -185,7 +186,7 @@ connector_config(Name, Endpoint) ->
<<"start_timeout">> => <<"5s">> <<"start_timeout">> => <<"5s">>
} }
}, },
emqx_bridge_v2_testlib:parse_and_check_connector(?ACTION_TYPE_BIN, Name, InnerConfigMap0). emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0).
direct_action_config(Overrides0) -> direct_action_config(Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0), Overrides = emqx_utils_maps:binary_key_map(Overrides0),

View File

@ -0,0 +1,94 @@
Business Source License 1.1
Licensor: Hangzhou EMQ Technologies Co., Ltd.
Licensed Work: EMQX Enterprise Edition
The Licensed Work is (c) 2024
Hangzhou EMQ Technologies Co., Ltd.
Additional Use Grant: Students and educators are granted right to copy,
modify, and create derivative work for research
or education.
Change Date: 2028-05-17
Change License: Apache License, Version 2.0
For information about alternative licensing arrangements for the Software,
please contact Licensor: https://www.emqx.com/en/contact
Notice
The Business Source License (this document, or the “License”) is not an Open
Source license. However, the Licensed Work will eventually be made available
under an Open Source License, as stated in this License.
License text copyright (c) 2017 MariaDB Corporation Ab, All Rights Reserved.
“Business Source License” is a trademark of MariaDB Corporation Ab.
-----------------------------------------------------------------------------
Business Source License 1.1
Terms
The Licensor hereby grants you the right to copy, modify, create derivative
works, redistribute, and make non-production use of the Licensed Work. The
Licensor may make an Additional Use Grant, above, permitting limited
production use.
Effective on the Change Date, or the fourth anniversary of the first publicly
available distribution of a specific version of the Licensed Work under this
License, whichever comes first, the Licensor hereby grants you rights under
the terms of the Change License, and the rights granted in the paragraph
above terminate.
If your use of the Licensed Work does not comply with the requirements
currently in effect as described in this License, you must purchase a
commercial license from the Licensor, its affiliated entities, or authorized
resellers, or you must refrain from using the Licensed Work.
All copies of the original and modified Licensed Work, and derivative works
of the Licensed Work, are subject to this License. This License applies
separately for each version of the Licensed Work and the Change Date may vary
for each version of the Licensed Work released by Licensor.
You must conspicuously display this License on each original or modified copy
of the Licensed Work. If you receive the Licensed Work in original or
modified form from a third party, the terms and conditions set forth in this
License apply to your use of that work.
Any use of the Licensed Work in violation of this License will automatically
terminate your rights under this License for the current and all other
versions of the Licensed Work.
This License does not grant you any right in any trademark or logo of
Licensor or its affiliates (provided that you may use a trademark or logo of
Licensor as expressly required by this License).
TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
TITLE.
MariaDB hereby grants you permission to use this Licenses text to license
your works, and to refer to it using the trademark “Business Source License”,
as long as you comply with the Covenants of Licensor below.
Covenants of Licensor
In consideration of the right to use this Licenses text and the “Business
Source License” name and trademark, Licensor covenants to MariaDB, and to all
other recipients of the licensed work to be provided by Licensor:
1. To specify as the Change License the GPL Version 2.0 or any later version,
or a license that is compatible with GPL Version 2.0 or a later version,
where “compatible” means that software provided under the Change License can
be included in a program with software provided under GPL Version 2.0 or a
later version. Licensor may specify additional Change Licenses without
limitation.
2. To either: (a) specify an additional grant of rights to use that does not
impose any additional restriction on the right granted in this License, as
the Additional Use Grant; or (b) insert the text “None”.
3. To specify a Change Date.
4. Not to modify this License in any other way.

View File

@ -0,0 +1,16 @@
# EMQX Couchbase Bridge
This application provides connector and action implementations for the EMQX to integrate with Couchbase as part of the EMQX data integration pipelines.
Users can leverage [EMQX Rule Engine](https://docs.emqx.com/en/enterprise/latest/data-integration/rules.html) to create rules that publish message data to Couchbase.
## Documentation
Refer to [Rules engine](https://docs.emqx.com/en/enterprise/latest/data-integration/rules.html) for the EMQX rules engine introduction.
## Contributing
Please see our [contributing.md](../../CONTRIBUTING.md).
## License
EMQ Business Source License 1.1, refer to [LICENSE](BSL.txt).

View File

@ -0,0 +1,2 @@
toxiproxy
couchbase

View File

@ -0,0 +1,30 @@
defmodule EMQXBridgeCouchbase.MixProject do
use Mix.Project
alias EMQXUmbrella.MixProject, as: UMP
def project do
[
app: :emqx_bridge_couchbase,
version: "0.1.0",
build_path: "../../_build",
erlc_options: UMP.erlc_options(),
erlc_paths: UMP.erlc_paths(),
deps_path: "../../deps",
lockfile: "../../mix.lock",
elixir: "~> 1.14",
start_permanent: Mix.env() == :prod,
deps: deps()
]
end
def application do
[extra_applications: UMP.extra_applications()]
end
def deps() do
[
{:emqx_resource, in_umbrella: true},
UMP.common_dep(:ehttpc)
]
end
end

View File

@ -0,0 +1,14 @@
%% -*- mode: erlang; -*-
{erl_opts, [
warn_unused_vars,
warn_shadow_vars,
warn_unused_import,
warn_obsolete_guard,
warnings_as_errors,
debug_info
]}.
{deps, [
{emqx_resource, {path, "../../apps/emqx_resource"}}
]}.

View File

@ -0,0 +1,21 @@
{application, emqx_bridge_couchbase, [
{description, "EMQX Enterprise Couchbase Bridge"},
{vsn, "0.1.0"},
{registered, []},
{applications, [
kernel,
stdlib,
ehttpc,
emqx_resource
]},
{env, [
{emqx_action_info_modules, [
emqx_bridge_couchbase_action_info
]},
{emqx_connector_info_modules, [
emqx_bridge_couchbase_connector_info
]}
]},
{modules, []},
{links, []}
]}.

View File

@ -0,0 +1,19 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-ifndef(__EMQX_BRIDGE_COUCHBASE_HRL__).
-define(__EMQX_BRIDGE_COUCHBASE_HRL__, true).
-define(CONNECTOR_TYPE, couchbase).
-define(CONNECTOR_TYPE_BIN, <<"couchbase">>).
-define(ACTION_TYPE, couchbase).
-define(ACTION_TYPE_BIN, <<"couchbase">>).
-define(SERVER_OPTIONS, #{
default_port => 8093
}).
%% END ifndef(__EMQX_BRIDGE_COUCHBASE_HRL__)
-endif.

View File

@ -0,0 +1,34 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_action_info).
-behaviour(emqx_action_info).
-include("emqx_bridge_couchbase.hrl").
%% `emqx_action_info' API
-export([
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_action_info' API
%%------------------------------------------------------------------------------
action_type_name() -> ?ACTION_TYPE.
connector_type_name() -> ?CONNECTOR_TYPE.
schema_module() -> emqx_bridge_couchbase_action_schema.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,148 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_action_schema).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_couchbase.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_bridge_v2_schema' "unofficial" API
-export([
bridge_v2_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"action_couchbase".
roots() ->
[].
fields(Field) when
Field == "get_bridge_v2";
Field == "put_bridge_v2";
Field == "post_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(?ACTION_TYPE));
fields(action) ->
{?ACTION_TYPE,
mk(
hoconsc:map(name, hoconsc:ref(?MODULE, ?ACTION_TYPE)),
#{
desc => <<"Couchbase Action Config">>,
required => false
}
)};
fields(?ACTION_TYPE) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(
ref(parameters),
#{
required => true,
desc => ?DESC("parameters")
}
),
#{resource_opts_ref => ref(action_resource_opts)}
);
fields(parameters) ->
[
{sql, mk(emqx_schema:template(), #{required => true, desc => ?DESC("sql")})},
{max_retries, mk(non_neg_integer(), #{required => false, desc => ?DESC("max_retries")})}
];
fields(action_resource_opts) ->
Fields = emqx_bridge_v2_schema:action_resource_opts_fields(),
lists:foldl(fun proplists:delete/2, Fields, [batch_size, batch_time]).
desc(Name) when
Name =:= ?ACTION_TYPE;
Name =:= parameters
->
?DESC(Name);
desc(action_resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_bridge_v2_schema' "unofficial" API
%%-------------------------------------------------------------------------------------------------
bridge_v2_examples(Method) ->
[
#{
?ACTION_TYPE_BIN => #{
summary => <<"Couchbase Action">>,
value => action_example(Method)
}
}
].
action_example(post) ->
maps:merge(
action_example(put),
#{
type => ?ACTION_TYPE_BIN,
name => <<"my_action">>
}
);
action_example(get) ->
maps:merge(
action_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
action_example(put) ->
#{
enable => true,
description => <<"my action">>,
connector => <<"my_connector">>,
parameters =>
#{
sql => <<"insert into mqtt (key, value) values (${.id}, ${.payload})">>
},
resource_opts =>
#{
%% batch is not yet supported
%% batch_time => <<"0ms">>,
%% batch_size => 1,
health_check_interval => <<"30s">>,
inflight_window => 100,
query_mode => <<"sync">>,
request_ttl => <<"45s">>,
worker_pool_size => 16
}
}.
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
ref(Name) -> hoconsc:ref(?MODULE, Name).
mk(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -0,0 +1,393 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_connector).
-behaviour(emqx_resource).
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/trace.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-include_lib("emqx/include/emqx_trace.hrl").
-include("emqx_bridge_couchbase.hrl").
%% `emqx_resource' API
-export([
callback_mode/0,
on_start/2,
on_stop/2,
on_get_status/2,
on_get_channels/1,
on_add_channel/4,
on_remove_channel/3,
on_get_channel_status/3,
on_query/3
%% on_batch_query/3
]).
%% API
-export([
query/3
]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(SUCCESS(STATUS_CODE), (STATUS_CODE >= 200 andalso STATUS_CODE < 300)).
%% Ad-hoc requests
-record(sql_query, {sql :: sql(), opts :: ad_hoc_query_opts()}).
-type connector_config() :: #{
connect_timeout := pos_integer(),
password := emqx_secret:t(binary()),
pipelining := pos_integer(),
pool_size := pos_integer(),
server := binary(),
username := binary()
}.
-type connector_state() :: #{
installed_actions := #{action_resource_id() => action_state()}
}.
-type action_config() :: #{
parameters := #{
sql := binary(),
max_retries := non_neg_integer()
},
resource_opts := #{request_ttl := timeout()}
}.
-type action_state() :: #{
args_template := emqx_template_sql:row_template(),
max_retries := non_neg_integer(),
request_ttl := timeout(),
sql := iolist()
}.
-type query() :: action_query() | ad_hoc_query().
-type action_query() :: {_Tag :: channel_id(), _Data :: map()}.
-type ad_hoc_query() :: #sql_query{}.
-type sql() :: iolist().
-type ad_hoc_query_opts() :: map().
%%------------------------------------------------------------------------------
%% `emqx_resource' API
%%------------------------------------------------------------------------------
-spec callback_mode() -> callback_mode().
callback_mode() ->
always_sync.
-spec on_start(connector_resource_id(), connector_config()) ->
{ok, connector_state()} | {error, _Reason}.
on_start(ConnResId, ConnConfig) ->
#{
server := Server,
connect_timeout := ConnectTimeout,
password := Password,
pipelining := Pipelining,
pool_size := PoolSize,
username := Username
} = ConnConfig,
#{hostname := Host, port := Port} = emqx_schema:parse_server(Server, ?SERVER_OPTIONS),
State = #{
installed_actions => #{},
password => Password,
username => Username
},
{Transport, TransportOpts0} =
case maps:get(ssl, ConnConfig) of
#{enable := true} = TLSConfig ->
TLSOpts = emqx_tls_lib:to_client_opts(TLSConfig),
{tls, TLSOpts};
_ ->
{tcp, []}
end,
TransportOpts = emqx_utils:ipv6_probe(TransportOpts0),
PoolOpts = [
{host, Host},
{port, Port},
{connect_timeout, ConnectTimeout},
{keepalive, 30_000},
{pool_type, random},
{pool_size, PoolSize},
{transport, Transport},
{transport_opts, TransportOpts},
{enable_pipelining, Pipelining}
],
case ehttpc_sup:start_pool(ConnResId, PoolOpts) of
{ok, _} ->
{ok, State};
{error, {already_started, _}} ->
{ok, State};
{error, Reason} ->
{error, Reason}
end.
-spec on_stop(connector_resource_id(), connector_state()) -> ok.
on_stop(ConnResId, _ConnState) ->
Res = ehttpc_sup:stop_pool(ConnResId),
?tp("couchbase_connector_stop", #{instance_id => ConnResId}),
Res.
-spec on_get_status(connector_resource_id(), connector_state()) ->
?status_connected | ?status_disconnected.
on_get_status(ConnResId, _ConnState) ->
health_check_pool_workers(ConnResId).
-spec on_add_channel(
connector_resource_id(),
connector_state(),
action_resource_id(),
action_config()
) ->
{ok, connector_state()}.
on_add_channel(_ConnResId, ConnState0, ActionResId, ActionConfig) ->
ActionState = create_action(ActionConfig),
ConnState = emqx_utils_maps:deep_put([installed_actions, ActionResId], ConnState0, ActionState),
{ok, ConnState}.
-spec on_remove_channel(
connector_resource_id(),
connector_state(),
action_resource_id()
) ->
{ok, connector_state()}.
on_remove_channel(
_ConnResId, ConnState0 = #{installed_actions := InstalledActions0}, ActionResId
) when
is_map_key(ActionResId, InstalledActions0)
->
#{installed_actions := InstalledActions0} = ConnState0,
InstalledActions = maps:remove(ActionResId, InstalledActions0),
ConnState = ConnState0#{installed_actions := InstalledActions},
{ok, ConnState};
on_remove_channel(_ConnResId, ConnState, _ActionResId) ->
{ok, ConnState}.
-spec on_get_channels(connector_resource_id()) ->
[{action_resource_id(), action_config()}].
on_get_channels(ConnResId) ->
emqx_bridge_v2:get_channels_for_connector(ConnResId).
-spec on_get_channel_status(
connector_resource_id(),
action_resource_id(),
connector_state()
) ->
?status_connected | ?status_disconnected.
on_get_channel_status(
_ConnResId,
ActionResId,
_ConnState = #{installed_actions := InstalledActions}
) when is_map_key(ActionResId, InstalledActions) ->
%% Is it possible to infer table existence and whatnot from an arbitrary statement?
?status_connected;
on_get_channel_status(_ConnResId, _ActionResId, _ConnState) ->
?status_disconnected.
-spec on_query(connector_resource_id(), query(), connector_state()) ->
{ok, _Result} | {error, _Reason}.
on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) when
is_map_key(Tag, InstalledActions)
->
?tp("couchbase_on_query_enter", #{}),
ActionState = maps:get(Tag, InstalledActions),
run_action(ConnResId, Data, ActionState, ConnState);
on_query(ConnResId, #sql_query{sql = SQL, opts = Opts}, ConnState) ->
run_query(ConnResId, SQL, Opts, ConnState);
on_query(_ConnResId, Query, _ConnState) ->
{error, {unrecoverable_error, {invalid_query, Query}}}.
%% -spec on_batch_query(connector_resource_id(), [query()], connector_state()) ->
%% {ok, _Result} | {error, _Reason}.
%% on_batch_query(_ConnResId, [{Tag, _} | Rest], #{installed_actions := InstalledActions}) when
%% is_map_key(Tag, InstalledActions)
%% ->
%% ActionState = maps:get(Tag, InstalledActions),
%% todo;
%% on_batch_query(_ConnResId, Batch, _ConnState) ->
%% {error, {unrecoverable_error, {bad_batch, Batch}}}.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
-spec query(connector_resource_id(), _SQL :: iolist(), _Opts :: map()) -> _TODO.
query(ConnResId, SQL, Opts) ->
emqx_resource:simple_sync_query(ConnResId, #sql_query{sql = SQL, opts = Opts}).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
-spec create_action(action_config()) -> action_state().
create_action(ActionConfig) ->
#{
parameters := #{
sql := SQL0,
max_retries := MaxRetries
},
resource_opts := #{request_ttl := RequestTTL}
} = ActionConfig,
{SQL, ArgsTemplate} = emqx_template_sql:parse_prepstmt(SQL0, #{parameters => '$n'}),
#{
args_template => ArgsTemplate,
max_retries => MaxRetries,
request_ttl => RequestTTL,
sql => SQL
}.
-spec health_check_pool_workers(connector_resource_id()) ->
?status_connected | ?status_connecting | ?status_disconnected.
health_check_pool_workers(ConnResId) ->
Timeout = emqx_resource_pool:health_check_timeout(),
Workers = [Worker || {_WorkerName, Worker} <- ehttpc:workers(ConnResId)],
try
emqx_utils:pmap(fun(Worker) -> ehttpc:health_check(Worker, Timeout) end, Workers, Timeout)
of
[] ->
?status_connecting;
[_ | _] = Results ->
case [E || {error, _} = E <- Results] of
[] ->
ping(ConnResId, Timeout);
[{error, Reason} | _] ->
?SLOG(info, #{
msg => "couchbase_health_check_failed",
reason => emqx_utils:redact(Reason),
connector => ConnResId
}),
?status_disconnected
end
catch
exit:timeout ->
?SLOG(info, #{
msg => "couchbase_health_check_failed",
reason => timeout,
connector => ConnResId
}),
?status_disconnected
end.
ping(ConnResId, RequestTTL) ->
MaxRetries = 3,
Request = {<<"/admin/ping">>, []},
Response = ehttpc:request(ConnResId, get, Request, RequestTTL, MaxRetries),
case Response of
{ok, 200, _Headers, Body0} ->
case maybe_decode_json(Body0) of
#{<<"status">> := <<"OK">>} ->
?status_connected;
_ ->
?tp("couchbase_bad_ping_response", #{response => Response}),
?status_disconnected
end;
_ ->
?tp("couchbase_bad_ping_response", #{response => Response}),
?status_disconnected
end.
render_args(Context, ArgsTemplate) ->
%% Missing values will become `null'.
{Rendered, _Missing} = emqx_template_sql:render_prepstmt(ArgsTemplate, Context),
Rendered.
auth_header(ConnState) ->
#{
username := Username,
password := Password0
} = ConnState,
Password = emqx_secret:unwrap(Password0),
BasicAuth = base64:encode(<<Username/binary, ":", Password/binary>>),
{<<"Authorization">>, [<<"Basic ">>, BasicAuth]}.
run_action(ConnResId, Data, ActionState, ConnState) ->
#{
args_template := ArgsTemplate,
sql := SQL,
request_ttl := RequestTTL,
max_retries := MaxRetries
} = ActionState,
Args = render_args(Data, ArgsTemplate),
do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState).
run_query(ConnResId, SQL, Opts, ConnState) ->
RequestTTL = maps:get(request_ttl, Opts, timer:seconds(15)),
MaxRetries = maps:get(max_retries, Opts, 3),
Args = maps:get(args, Opts, undefined),
do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState).
do_query(ConnResId, SQL, Args, RequestTTL, MaxRetries, ConnState) ->
Body0 = #{statement => iolist_to_binary(SQL)},
Body1 = emqx_utils_maps:put_if(Body0, args, Args, Args =/= undefined),
Body = emqx_utils_json:encode(Body1),
Request = {
<<"/query/service">>,
[
auth_header(ConnState),
{<<"Content-Type">>, <<"application/json">>}
],
Body
},
Response0 = ehttpc:request(ConnResId, post, Request, RequestTTL, MaxRetries),
Response = map_response(Response0),
?tp("couchbase_response", #{response => Response, request => Request}),
Response.
maybe_decode_json(Raw) ->
case emqx_utils_json:safe_decode(Raw, [return_maps]) of
{ok, JSON} ->
JSON;
{error, _} ->
Raw
end.
map_response({error, Reason}) when
Reason =:= econnrefused;
Reason =:= timeout;
Reason =:= normal;
Reason =:= {shutdown, normal};
Reason =:= {shutdown, closed}
->
?tp("couchbase_query_error", #{reason => Reason}),
{error, {recoverable_error, Reason}};
map_response({error, {closed, _Message} = Reason}) ->
%% _Message = "The connection was lost."
?tp("couchbase_query_error", #{reason => Reason}),
{error, {recoverable_error, Reason}};
map_response({error, Reason}) ->
?tp("couchbase_query_error", #{reason => Reason}),
{error, {unrecoverable_error, Reason}};
map_response({ok, StatusCode, Headers}) when ?SUCCESS(StatusCode) ->
?tp("couchbase_query_success", #{}),
{ok, #{status_code => StatusCode, headers => Headers}};
map_response({ok, StatusCode, Headers, Body0}) when ?SUCCESS(StatusCode) ->
%% couchbase returns status = 200 with "status: errors" and "errors" key in body... 🫠
case maybe_decode_json(Body0) of
#{<<"status">> := <<"success">>} = Body ->
?tp("couchbase_query_success", #{}),
{ok, #{status_code => StatusCode, headers => Headers, body => Body}};
Body ->
?tp("couchbase_query_error", #{reason => {StatusCode, Headers, Body}}),
{error,
{unrecoverable_error, #{
status_code => StatusCode, headers => Headers, body => Body
}}}
end;
map_response({ok, StatusCode, Headers}) ->
?tp("couchbase_query_error", #{reason => {StatusCode, Headers}}),
{error, {unrecoverable_error, #{status_code => StatusCode, headers => Headers}}};
map_response({ok, StatusCode, Headers, Body0}) ->
Body = maybe_decode_json(Body0),
?tp("couchbase_query_error", #{reason => {StatusCode, Headers, Body}}),
{error,
{unrecoverable_error, #{
status_code => StatusCode, headers => Headers, body => Body
}}}.

View File

@ -0,0 +1,73 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_connector_info).
-behaviour(emqx_connector_info).
-include("emqx_bridge_couchbase.hrl").
%% `emqx_connector_info' API
-export([
type_name/0,
bridge_types/0,
resource_callback_module/0,
config_schema/0,
schema_module/0,
api_schema/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
-define(SCHEMA_MOD, emqx_bridge_couchbase_connector_schema).
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% `emqx_connector_info' API
%%------------------------------------------------------------------------------
type_name() ->
?CONNECTOR_TYPE.
bridge_types() ->
[?ACTION_TYPE].
resource_callback_module() ->
emqx_bridge_couchbase_connector.
config_schema() ->
{?CONNECTOR_TYPE,
hoconsc:mk(
hoconsc:map(
name,
hoconsc:ref(
?SCHEMA_MOD,
"config_connector"
)
),
#{
desc => <<"Couchbase Connector Config">>,
required => false
}
)}.
schema_module() ->
?SCHEMA_MOD.
api_schema(Method) ->
emqx_connector_schema:api_ref(
?SCHEMA_MOD, ?CONNECTOR_TYPE_BIN, Method ++ "_connector"
).
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------

View File

@ -0,0 +1,136 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_connector_schema).
-behaviour(hocon_schema).
-behaviour(emqx_connector_examples).
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include("emqx_bridge_couchbase.hrl").
%% `hocon_schema' API
-export([
namespace/0,
roots/0,
fields/1,
desc/1
]).
%% `emqx_connector_examples' API
-export([
connector_examples/1
]).
%% API
-export([]).
%%------------------------------------------------------------------------------
%% Type declarations
%%------------------------------------------------------------------------------
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
namespace() ->
"connector_couchbase".
roots() ->
[].
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, fields(connector_config));
fields("config_connector") ->
emqx_connector_schema:common_fields() ++ fields(connector_config);
fields(connector_config) ->
[
{server,
emqx_schema:servers_sc(
#{required => true, desc => ?DESC("server")},
?SERVER_OPTIONS
)},
{connect_timeout,
mk(emqx_schema:timeout_duration_ms(), #{
default => <<"15s">>, desc => ?DESC("connect_timeout")
})},
{pipelining, mk(pos_integer(), #{default => 100, desc => ?DESC("pipelining")})},
{pool_size, mk(pos_integer(), #{default => 8, desc => ?DESC("pool_size")})},
{username, mk(binary(), #{required => true, desc => ?DESC("username")})},
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
] ++
emqx_connector_schema:resource_opts() ++
emqx_connector_schema_lib:ssl_fields().
desc("config_connector") ->
?DESC("config_connector");
desc(resource_opts) ->
?DESC(emqx_resource_schema, resource_opts);
desc(_Name) ->
undefined.
%%-------------------------------------------------------------------------------------------------
%% `emqx_connector_examples' API
%%-------------------------------------------------------------------------------------------------
connector_examples(Method) ->
[
#{
<<"couchbase">> => #{
summary => <<"Couchbase Connector">>,
value => connector_example(Method)
}
}
].
connector_example(get) ->
maps:merge(
connector_example(put),
#{
status => <<"connected">>,
node_status => [
#{
node => <<"emqx@localhost">>,
status => <<"connected">>
}
]
}
);
connector_example(post) ->
maps:merge(
connector_example(put),
#{
type => atom_to_binary(?CONNECTOR_TYPE),
name => <<"my_connector">>
}
);
connector_example(put) ->
#{
enable => true,
description => <<"My connector">>,
server => <<"couchbase:8093">>,
username => <<"admin">>,
password => <<"******">>,
ssl => #{enable => true},
resource_opts => #{
health_check_interval => <<"45s">>,
start_after_created => true,
start_timeout => <<"5s">>
}
}.
%%------------------------------------------------------------------------------
%% API
%%------------------------------------------------------------------------------
%%------------------------------------------------------------------------------
%% Internal fns
%%------------------------------------------------------------------------------
mk(Type, Meta) -> hoconsc:mk(Type, Meta).

View File

@ -0,0 +1,484 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_couchbase_SUITE).
-feature(maybe_expr, enable).
-compile(nowarn_export_all).
-compile(export_all).
-elvis([{elvis_text_style, line_length, #{skip_comments => whole_line}}]).
%% -import(emqx_common_test_helpers, [on_exit/1]).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include("../src/emqx_bridge_couchbase.hrl").
-define(USERNAME, <<"admin">>).
-define(PASSWORD, <<"public">>).
-define(BUCKET, <<"mqtt">>).
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
Host = os:getenv("COUCHBASE_HOST", "toxiproxy"),
DirectHost = os:getenv("COUCHBASE_DIRECT_HOST", "couchbase"),
Port = list_to_integer(os:getenv("COUCHBASE_PORT", "8093")),
AdminPort = list_to_integer(os:getenv("COUCHBASE_ADMIN_PORT", "8091")),
Server = iolist_to_binary([Host, ":", integer_to_binary(Port)]),
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
ProxyName = "couchbase",
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
case emqx_common_test_helpers:is_tcp_server_available(Host, Port) of
true ->
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_bridge_couchbase,
emqx_bridge,
emqx_rule_engine,
emqx_management,
emqx_mgmt_api_test_util:emqx_dashboard()
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
[
{apps, Apps},
{proxy_name, ProxyName},
{proxy_host, ProxyHost},
{proxy_port, ProxyPort},
{server, Server},
{host, Host},
{direct_host, DirectHost},
{admin_port, AdminPort}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_couchbase);
_ ->
{skip, no_couchbase}
end
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(TestCase, Config0) ->
ct:timetrap(timer:seconds(16)),
Server = ?config(server, Config0),
UniqueNum = integer_to_binary(erlang:unique_integer()),
Name = <<(atom_to_binary(TestCase))/binary, UniqueNum/binary>>,
ConnectorConfig = connector_config(Name, Server),
ActionConfig0 = action_config(Name, #{connector => Name}),
ActionConfig = emqx_bridge_v2_testlib:parse_and_check(?ACTION_TYPE_BIN, Name, ActionConfig0),
Config = [
{bridge_kind, action},
{action_type, ?ACTION_TYPE_BIN},
{action_name, Name},
{action_config, ActionConfig},
{connector_name, Name},
{connector_type, ?CONNECTOR_TYPE_BIN},
{connector_config, ConnectorConfig}
| Config0
],
start_admin_client(Config).
end_per_testcase(_Testcase, Config) ->
ProxyHost = ?config(proxy_host, Config),
ProxyPort = ?config(proxy_port, Config),
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
delete_scope(scope(), Config),
stop_admin_client(Config),
emqx_common_test_helpers:call_janitor(),
ok = snabbkaffe:stop(),
ok.
%%------------------------------------------------------------------------------
%% Helper fns
%%------------------------------------------------------------------------------
start_admin_client(Config) ->
DirectHost = ?config(direct_host, Config),
AdminPort = ?config(admin_port, Config),
AdminPool = <<"couchbase_SUITE_admin">>,
PoolOpts = [
{host, DirectHost},
{port, AdminPort},
{transport, tcp}
],
{ok, _} = ehttpc_sup:start_pool(AdminPool, PoolOpts),
[{admin_pool, AdminPool} | Config].
stop_admin_client(Config) ->
AdminPool = ?config(admin_pool, Config),
ok = ehttpc_sup:stop_pool(AdminPool),
ok.
connector_config(Name, Server) ->
InnerConfigMap0 =
#{
<<"enable">> => true,
<<"tags">> => [<<"bridge">>],
<<"description">> => <<"my cool bridge">>,
<<"server">> => Server,
<<"username">> => ?USERNAME,
<<"password">> => ?PASSWORD,
<<"ssl">> => #{<<"enable">> => false},
<<"resource_opts">> =>
#{
<<"health_check_interval">> => <<"1s">>,
<<"start_after_created">> => true,
<<"start_timeout">> => <<"5s">>
}
},
emqx_bridge_v2_testlib:parse_and_check_connector(?CONNECTOR_TYPE_BIN, Name, InnerConfigMap0).
action_config(Name, Overrides0) ->
Overrides = emqx_utils_maps:binary_key_map(Overrides0),
CommonConfig =
#{
<<"enable">> => true,
<<"connector">> => <<"please override">>,
<<"parameters">> =>
#{
<<"sql">> => sql(Name),
<<"max_retries">> => 3
},
<<"resource_opts">> => #{
%% Batch is not yet supported
%% <<"batch_size">> => 1,
%% <<"batch_time">> => <<"0ms">>,
<<"buffer_mode">> => <<"memory_only">>,
<<"buffer_seg_bytes">> => <<"10MB">>,
<<"health_check_interval">> => <<"1s">>,
<<"inflight_window">> => 100,
<<"max_buffer_bytes">> => <<"256MB">>,
<<"metrics_flush_interval">> => <<"1s">>,
<<"query_mode">> => <<"sync">>,
<<"request_ttl">> => <<"15s">>,
<<"resume_interval">> => <<"1s">>,
<<"worker_pool_size">> => <<"1">>
}
},
emqx_utils_maps:deep_merge(CommonConfig, Overrides).
auth_header() ->
BasicAuth = base64:encode(<<?USERNAME/binary, ":", ?PASSWORD/binary>>),
{<<"Authorization">>, [<<"Basic ">>, BasicAuth]}.
ensure_scope(Scope, Config) ->
case get_scope(Scope, Config) of
{ok, _} ->
ct:pal("scope ~s already exists", [Scope]),
ok;
undefined ->
ct:pal("creating scope ~s", [Scope]),
{200, _} = create_scope(Scope, Config),
ok
end.
ensure_collection(Scope, Collection, Opts, Config) ->
case get_collection(Scope, Collection, Config) of
{ok, _} ->
ct:pal("collection ~s.~s already exists", [Scope, Collection]),
ok;
undefined ->
ct:pal("creating collection ~s.~s", [Scope, Collection]),
{200, _} = create_collection(Scope, Collection, Opts, Config),
ok
end.
create_scope(Scope, Config) ->
AdminPool = ?config(admin_pool, Config),
ReqBody = [<<"name=">>, Scope],
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
[
auth_header(),
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
],
ReqBody
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, post, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("create scope response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
create_collection(Scope, Collection, _Opts, Config) ->
AdminPool = ?config(admin_pool, Config),
ReqBody = [<<"name=">>, Collection],
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope, <<"/collections">>],
[
auth_header(),
{<<"Content-Type">>, <<"application/x-www-form-urlencoded">>}
],
ReqBody
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, post, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("create collection response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
delete_scope(Scope, Config) ->
AdminPool = ?config(admin_pool, Config),
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes/">>, Scope],
[auth_header()]
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, StatusCode, _Headers, Body0} = ehttpc:request(
AdminPool, delete, Request, RequestTTL, MaxRetries
),
Body = maybe_decode_json(Body0),
ct:pal("delete scope response:\n ~p", [{StatusCode, Body}]),
{StatusCode, Body}.
get_scopes(Config) ->
AdminPool = ?config(admin_pool, Config),
Request = {
[<<"/pools/default/buckets/">>, ?BUCKET, <<"/scopes">>],
[auth_header()]
},
RequestTTL = timer:seconds(5),
MaxRetries = 3,
{ok, 200, _Headers, Body0} = ehttpc:request(AdminPool, get, Request, RequestTTL, MaxRetries),
Body = maybe_decode_json(Body0),
ct:pal("get scopes response:\n ~p", [Body]),
Body.
get_scope(Scope, Config) ->
#{<<"scopes">> := Scopes} = get_scopes(Config),
fetch_with_name(Scopes, Scope).
get_collections(Scope, Config) ->
maybe
{ok, #{<<"collections">> := Cs}} = get_scope(Scope, Config),
{ok, Cs}
end.
get_collection(Scope, Collection, Config) ->
maybe
{ok, Cs} = get_collections(Scope, Config),
fetch_with_name(Cs, Collection)
end.
fetch_with_name(Xs, Name) ->
case [X || X = #{<<"name">> := N} <- Xs, N =:= Name] of
[] ->
undefined;
[X] ->
{ok, X}
end.
maybe_decode_json(Body) ->
case emqx_utils_json:safe_decode(Body, [return_maps]) of
{ok, JSON} ->
JSON;
{error, _} ->
Body
end.
%% Collection creation is async... Trying to insert or select from a recently created
%% collection might result in error 12003, "Keyspace not found in CB datastore".
%% https://www.couchbase.com/forums/t/error-creating-primary-index-immediately-after-collection-creation-keyspace-not-found-in-cb-datastore/32479
wait_until_collection_is_ready(Scope, Collection, Config) ->
wait_until_collection_is_ready(Scope, Collection, Config, _N = 5, _SleepMS = 200).
wait_until_collection_is_ready(Scope, Collection, _Config, N, _SleepMS) when N < 0 ->
error({collection_not_ready_timeout, Scope, Collection});
wait_until_collection_is_ready(Scope, Collection, Config, N, SleepMS) ->
case get_all_rows(Scope, Collection, Config) of
{ok, _} ->
ct:pal("collection ~s.~s ready", [Scope, Collection]),
ok;
Resp ->
ct:pal("waiting for collection ~s.~s error response:\n ~p", [Scope, Collection, Resp]),
ct:sleep(SleepMS),
wait_until_collection_is_ready(Scope, Collection, Config, N - 1, SleepMS)
end.
scope() ->
<<"some_scope">>.
sql(Name) ->
Scope = scope(),
iolist_to_binary([
<<"insert into default:mqtt.">>,
Scope,
<<".">>,
<<"`">>,
Name,
<<"`">>,
<<" (key, value) values (${.id}, ${.})">>
]).
get_all_rows(Scope, Collection, Config) ->
ConnResId = emqx_bridge_v2_testlib:connector_resource_id(Config),
SQL = iolist_to_binary([
<<"select * from default:mqtt.">>,
Scope,
<<".">>,
<<"`">>,
Collection,
<<"`">>
]),
Opts = #{},
Resp = emqx_bridge_couchbase_connector:query(ConnResId, SQL, Opts),
ct:pal("get rows response:\n ~p", [Resp]),
case Resp of
{ok, #{
status_code := 200, body := #{<<"status">> := <<"success">>, <<"results">> := Rows0}
}} ->
Rows = lists:map(
fun(#{Collection := Value}) ->
maybe_decode_json(Value)
end,
Rows0
),
{ok, Rows};
{error, _} ->
Resp
end.
proplist_update(Proplist, K, Fn) ->
{K, OldV} = lists:keyfind(K, 1, Proplist),
NewV = Fn(OldV),
lists:keystore(K, 1, Proplist, {K, NewV}).
pre_publish_fn(Scope, Collection, Config) ->
fun(Context) ->
ensure_scope(Scope, Config),
ensure_collection(Scope, Collection, _Opts = #{}, Config),
wait_until_collection_is_ready(Scope, Collection, Config),
Context
end.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_start_stop(Config) ->
ok = emqx_bridge_v2_testlib:t_start_stop(Config, "couchbase_connector_stop"),
ok.
t_create_via_http(Config) ->
ok = emqx_bridge_v2_testlib:t_create_via_http(Config),
ok.
t_on_get_status(Config) ->
ok = emqx_bridge_v2_testlib:t_on_get_status(Config),
ok.
t_rule_action(Config) ->
Scope = scope(),
Collection = ?config(action_name, Config),
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
PostPublishFn = fun(#{payload := Payload} = Context) ->
%% need to retry because things are async in couchbase
?retry(
100,
10,
?assertMatch(
{ok, [#{<<"payload">> := Payload}]},
get_all_rows(Scope, Collection, Config)
)
),
Context
end,
Opts = #{pre_publish_fn => PrePublishFn, post_publish_fn => PostPublishFn},
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
ok.
%% batch is not yet supported
skip_t_rule_action_batch(Config0) ->
Config = proplist_update(Config0, action_config, fun(ActionConfig) ->
emqx_utils_maps:deep_merge(
ActionConfig,
#{
<<"resource_opts">> => #{
<<"batch_size">> => 10,
<<"batch_time">> => <<"100ms">>
}
}
)
end),
Scope = scope(),
Collection = ?config(action_name, Config),
PrePublishFn = pre_publish_fn(Scope, Collection, Config),
PublishFn = fun(#{rule_topic := RuleTopic, payload_fn := PayloadFn} = Context) ->
Payloads = emqx_utils:pmap(
fun(_) ->
Payload = PayloadFn(),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
{ok, C} = emqtt:start_link(#{clean_start => true, clientid => ClientId}),
{ok, _} = emqtt:connect(C),
?assertMatch({ok, _}, emqtt:publish(C, RuleTopic, Payload, [{qos, 2}])),
Payload
end,
lists:seq(1, 10)
),
Context#{payloads => Payloads}
end,
PostPublishFn = fun(#{payloads := _Payloads} = Context) ->
%% need to retry because things are async in couchbase
?retry(
200,
10,
?assertMatch(
{ok, [#{<<"payload">> := todo}]},
get_all_rows(Scope, Collection, Config)
)
),
Context
end,
Opts = #{
pre_publish_fn => PrePublishFn,
publish_fn => PublishFn,
post_publish_fn => PostPublishFn
},
ok = emqx_bridge_v2_testlib:t_rule_action(Config, Opts),
ok.
t_sync_query_down(Config) ->
Scope = scope(),
Collection = ?config(action_name, Config),
MakeMsgFn = fun(RuleTopic) ->
ensure_scope(Scope, Config),
ensure_collection(Scope, Collection, _Opts = #{}, Config),
wait_until_collection_is_ready(Scope, Collection, Config),
emqx_message:make(RuleTopic, <<"hi">>)
end,
Opts = #{
make_message_fn => MakeMsgFn,
enter_tp_filter => ?match_event(#{?snk_kind := "couchbase_on_query_enter"}),
error_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_error"}),
success_tp_filter => ?match_event(#{?snk_kind := "couchbase_query_success"})
},
emqx_bridge_v2_testlib:t_sync_query_down(Config, Opts),
ok.

View File

@ -77,38 +77,39 @@
-if(?EMQX_RELEASE_EDITION == ee). -if(?EMQX_RELEASE_EDITION == ee).
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->
[ [
emqx_bridge_dynamo_connector_info, emqx_bridge_azure_blob_storage_connector_info,
emqx_bridge_azure_event_hub_connector_info, emqx_bridge_azure_event_hub_connector_info,
emqx_bridge_cassandra_connector_info,
emqx_bridge_clickhouse_connector_info,
emqx_bridge_confluent_producer_connector_info, emqx_bridge_confluent_producer_connector_info,
emqx_bridge_couchbase_connector_info,
emqx_bridge_dynamo_connector_info,
emqx_bridge_es_connector_info,
emqx_bridge_gcp_pubsub_consumer_connector_info, emqx_bridge_gcp_pubsub_consumer_connector_info,
emqx_bridge_gcp_pubsub_producer_connector_info, emqx_bridge_gcp_pubsub_producer_connector_info,
emqx_bridge_greptimedb_connector_info,
emqx_bridge_hstreamdb_connector_info, emqx_bridge_hstreamdb_connector_info,
emqx_bridge_influxdb_connector_info,
emqx_bridge_iotdb_connector_info,
emqx_bridge_kafka_consumer_connector_info, emqx_bridge_kafka_consumer_connector_info,
emqx_bridge_kafka_producer_connector_info, emqx_bridge_kafka_producer_connector_info,
emqx_bridge_kinesis_connector_info, emqx_bridge_kinesis_connector_info,
emqx_bridge_matrix_connector_info, emqx_bridge_matrix_connector_info,
emqx_bridge_pgsql_connector_info,
emqx_bridge_timescale_connector_info,
emqx_bridge_mongodb_connector_info, emqx_bridge_mongodb_connector_info,
emqx_bridge_oracle_connector_info,
emqx_bridge_influxdb_connector_info,
emqx_bridge_cassandra_connector_info,
emqx_bridge_clickhouse_connector_info,
emqx_bridge_mysql_connector_info, emqx_bridge_mysql_connector_info,
emqx_bridge_opents_connector_info,
emqx_bridge_oracle_connector_info,
emqx_bridge_pgsql_connector_info,
emqx_bridge_pulsar_connector_info,
emqx_bridge_rabbitmq_connector_info,
emqx_bridge_redis_connector_info, emqx_bridge_redis_connector_info,
emqx_bridge_rocketmq_connector_info, emqx_bridge_rocketmq_connector_info,
emqx_bridge_s3_connector_info,
emqx_bridge_sqlserver_connector_info,
emqx_bridge_syskeeper_connector_info, emqx_bridge_syskeeper_connector_info,
emqx_bridge_syskeeper_proxy_connector_info, emqx_bridge_syskeeper_proxy_connector_info,
emqx_bridge_sqlserver_connector_info,
emqx_bridge_iotdb_connector_info,
emqx_bridge_es_connector_info,
emqx_bridge_opents_connector_info,
emqx_bridge_greptimedb_connector_info,
emqx_bridge_pulsar_connector_info,
emqx_bridge_tdengine_connector_info, emqx_bridge_tdengine_connector_info,
emqx_bridge_rabbitmq_connector_info, emqx_bridge_timescale_connector_info
emqx_bridge_s3_connector_info,
emqx_bridge_azure_blob_storage_connector_info
]. ].
-else. -else.
hard_coded_connector_info_modules_ee() -> hard_coded_connector_info_modules_ee() ->

View File

@ -50,6 +50,7 @@
-export([ -export([
common_resource_opts_subfields/0, common_resource_opts_subfields/0,
common_resource_opts_subfields_bin/0, common_resource_opts_subfields_bin/0,
resource_opts/0,
resource_opts_fields/0, resource_opts_fields/0,
resource_opts_fields/1, resource_opts_fields/1,
resource_opts_ref/2, resource_opts_ref/2,
@ -431,6 +432,8 @@ roots() ->
fields(connectors) -> fields(connectors) ->
connector_info_fields_connectors(); connector_info_fields_connectors();
fields(resource_opts) ->
resource_opts_fields();
fields("node_status") -> fields("node_status") ->
[ [
node_name(), node_name(),
@ -454,6 +457,8 @@ desc(connectors) ->
?DESC("desc_connectors"); ?DESC("desc_connectors");
desc("node_status") -> desc("node_status") ->
?DESC("desc_node_status"); ?DESC("desc_node_status");
desc(resource_opts) ->
?DESC(emqx_resource_schema, "creation_opts");
desc(_) -> desc(_) ->
undefined. undefined.
@ -548,6 +553,9 @@ common_resource_opts_subfields() ->
common_resource_opts_subfields_bin() -> common_resource_opts_subfields_bin() ->
lists:map(fun atom_to_binary/1, common_resource_opts_subfields()). lists:map(fun atom_to_binary/1, common_resource_opts_subfields()).
resource_opts() ->
resource_opts_ref(?MODULE, resource_opts).
resource_opts_fields() -> resource_opts_fields() ->
resource_opts_fields(_Overrides = []). resource_opts_fields(_Overrides = []).

View File

@ -121,6 +121,7 @@
emqx_s3, emqx_s3,
emqx_bridge_s3, emqx_bridge_s3,
emqx_bridge_azure_blob_storage, emqx_bridge_azure_blob_storage,
emqx_bridge_couchbase,
emqx_schema_registry, emqx_schema_registry,
emqx_eviction_agent, emqx_eviction_agent,
emqx_node_rebalance, emqx_node_rebalance,

View File

@ -0,0 +1 @@
Implemented Couchbase data integration.

View File

@ -341,6 +341,7 @@ defmodule EMQXUmbrella.MixProject do
:emqx_s3, :emqx_s3,
:emqx_bridge_s3, :emqx_bridge_s3,
:emqx_bridge_azure_blob_storage, :emqx_bridge_azure_blob_storage,
:emqx_bridge_couchbase,
:emqx_schema_registry, :emqx_schema_registry,
:emqx_schema_validation, :emqx_schema_validation,
:emqx_message_transformation, :emqx_message_transformation,

View File

@ -105,6 +105,7 @@ is_community_umbrella_app("apps/emqx_ft") -> false;
is_community_umbrella_app("apps/emqx_s3") -> false; is_community_umbrella_app("apps/emqx_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_s3") -> false; is_community_umbrella_app("apps/emqx_bridge_s3") -> false;
is_community_umbrella_app("apps/emqx_bridge_azure_blob_storage") -> false; is_community_umbrella_app("apps/emqx_bridge_azure_blob_storage") -> false;
is_community_umbrella_app("apps/emqx_bridge_couchbase") -> false;
is_community_umbrella_app("apps/emqx_schema_registry") -> false; is_community_umbrella_app("apps/emqx_schema_registry") -> false;
is_community_umbrella_app("apps/emqx_enterprise") -> false; is_community_umbrella_app("apps/emqx_enterprise") -> false;
is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false; is_community_umbrella_app("apps/emqx_bridge_kinesis") -> false;

View File

@ -0,0 +1,22 @@
emqx_bridge_couchbase_action_schema {
couchbase.label:
"""Upload to Couchbase"""
couchbase.desc:
"""Action that takes incoming events and uploads them to the Couchbase service."""
parameters.label:
"""Couchbase action parameters"""
parameters.desc:
"""Set of parameters for the action."""
sql.label:
"""SQL Template"""
sql.desc:
"""SQL Template"""
max_retries.label:
"""Max Retries"""
max_retries.desc:
"""Max retry times if error on sending request."""
}

View File

@ -0,0 +1,44 @@
emqx_bridge_couchbase_connector_schema {
config_connector.label:
"""Couchbase Connector Configuration"""
config_connector.desc:
"""Configuration for a connector to Couchbase service."""
username.label:
"""Username"""
username.desc:
"""Username for Couchbase service."""
password.label:
"""Password"""
password.desc:
"""Password for Couchbase service."""
password.label:
"""Password"""
password.desc:
"""Password for Couchbase service."""
server.label:
"""Server Host"""
server.desc:
"""The IPv4 or IPv6 address or the hostname to connect to.<br/>
A host entry has the following form: `Host[:Port]`.<br/>
The Couchbase default query service port 8093 is used if `[:Port]` is not specified."""
pipelining.label:
"""HTTP Pipelining"""
pipelining.desc:
"""A positive integer. Whether to send HTTP requests continuously, when set to 1, it means that after each HTTP request is sent, you need to wait for the server to return and then continue to send the next request."""
connect_timeout.label:
"""Connect Timeout"""
connect_timeout.desc:
"""The timeout when connecting to the HTTP server."""
pool_size.label:
"""Pool Size"""
pool_size.desc:
"""The pool size."""
}

View File

@ -253,6 +253,9 @@ for dep in ${CT_DEPS}; do
azurite) azurite)
FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' ) FILES+=( '.ci/docker-compose-file/docker-compose-azurite.yaml' )
;; ;;
couchbase)
FILES+=( '.ci/docker-compose-file/docker-compose-couchbase.yaml' )
;;
*) *)
echo "unknown_ct_dependency $dep" echo "unknown_ct_dependency $dep"
exit 1 exit 1

View File

@ -13,6 +13,7 @@ CMD
CN CN
CONNACK CONNACK
CoAP CoAP
Couchbase
CRLs CRLs
Cygwin Cygwin
DES DES