diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index 57ede7c2f..34d624af4 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -75,6 +75,7 @@ hard_coded_action_info_modules_ee() -> [ emqx_bridge_azure_event_hub_action_info, emqx_bridge_confluent_producer_action_info, + emqx_bridge_gcp_pubsub_producer_action_info, emqx_bridge_kafka_action_info, emqx_bridge_syskeeper_action_info ]. diff --git a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl index d2fa85f92..7346ae6c7 100644 --- a/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl +++ b/apps/emqx_bridge/src/schema/emqx_bridge_v2_schema.erl @@ -40,7 +40,11 @@ -export([types/0, types_sc/0]). --export([make_producer_action_schema/1, make_consumer_action_schema/1]). +-export([ + make_producer_action_schema/1, + make_consumer_action_schema/1, + top_level_common_action_keys/0 +]). -export_type([action_type/0]). @@ -130,6 +134,8 @@ registered_schema_fields() -> desc(actions) -> ?DESC("desc_bridges_v2"); +desc(resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. @@ -154,6 +160,16 @@ examples(Method) -> SchemaModules = [Mod || {_, Mod} <- emqx_action_info:registered_schema_modules()], lists:foldl(Fun, #{}, SchemaModules). +top_level_common_action_keys() -> + [ + <<"connector">>, + <<"description">>, + <<"enable">>, + <<"local_topic">>, + <<"parameters">>, + <<"resource_opts">> + ]. + %%====================================================================================== %% Helper functions for making HOCON Schema %%====================================================================================== @@ -174,7 +190,10 @@ make_consumer_action_schema(ActionParametersRef) -> {description, emqx_schema:description_schema()}, {parameters, ActionParametersRef}, {resource_opts, - mk(ref(?MODULE, resource_opts), #{default => #{}, desc => ?DESC(resource_opts)})} + mk(ref(?MODULE, resource_opts), #{ + default => #{}, + desc => ?DESC(emqx_resource_schema, "resource_opts") + })} ]. -ifdef(TEST). @@ -196,7 +215,7 @@ schema_homogeneous_test() -> is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> Fields = Module:fields(TypeName), - ExpectedFieldNames = common_field_names(), + ExpectedFieldNames = lists:map(fun binary_to_atom/1, top_level_common_action_keys()), MissingFileds = lists:filter( fun(Name) -> lists:keyfind(Name, 1, Fields) =:= false end, ExpectedFieldNames ), @@ -211,9 +230,4 @@ is_bad_schema(#{type := ?MAP(_, ?R_REF(Module, TypeName))}) -> }} end. -common_field_names() -> - [ - enable, description, local_topic, connector, resource_opts, parameters - ]. - -endif. diff --git a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl index eb364bdff..e196aac30 100644 --- a/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl +++ b/apps/emqx_bridge_azure_event_hub/src/emqx_bridge_azure_event_hub.erl @@ -17,7 +17,7 @@ desc/1 ]). -%% emqx_bridge_enterprise "unofficial" API +%% `emqx_bridge_v2_schema' "unofficial" API -export([ bridge_v2_examples/1, conn_bridge_examples/1, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl index eeceb0c43..454c0d7ea 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_client.erl @@ -134,7 +134,7 @@ start( -spec stop(resource_id()) -> ok | {error, term()}. stop(ResourceId) -> - ?tp(gcp_pubsub_stop, #{resource_id => ResourceId}), + ?tp(gcp_pubsub_stop, #{instance_id => ResourceId, resource_id => ResourceId}), ?SLOG(info, #{ msg => "stopping_gcp_pubsub_bridge", connector => ResourceId diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index cd7568001..487118b3e 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -8,23 +8,30 @@ -include_lib("emqx_resource/include/emqx_resource.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl"). --type config() :: #{ - attributes_template := [#{key := binary(), value := binary()}], +-type connector_config() :: #{ connect_timeout := emqx_schema:duration_ms(), max_retries := non_neg_integer(), - ordering_key_template := binary(), - payload_template := binary(), - pubsub_topic := binary(), resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()}, - service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json(), - any() => term() + service_account_json := emqx_bridge_gcp_pubsub_client:service_account_json() }. --type state() :: #{ - attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, +-type action_config() :: #{ + parameters := #{ + attributes_template := [#{key := binary(), value := binary()}], + ordering_key_template := binary(), + payload_template := binary(), + pubsub_topic := binary() + }, + resource_opts := #{request_ttl := infinity | emqx_schema:duration_ms(), any() => term()} +}. +-type connector_state() :: #{ client := emqx_bridge_gcp_pubsub_client:state(), + installed_actions := #{action_resource_id() => action_state()}, + project_id := emqx_bridge_gcp_pubsub_client:project_id() +}. +-type action_state() :: #{ + attributes_template := #{emqx_placeholder:tmpl_token() => emqx_placeholder:tmpl_token()}, ordering_key_template := emqx_placeholder:tmpl_token(), payload_template := emqx_placeholder:tmpl_token(), - project_id := emqx_bridge_gcp_pubsub_client:project_id(), pubsub_topic := binary() }. -type headers() :: emqx_bridge_gcp_pubsub_client:headers(). @@ -41,7 +48,11 @@ on_query_async/4, on_batch_query/3, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channels/1, + on_get_channel_status/3 ]). -export([reply_delegator/2]). @@ -54,53 +65,45 @@ callback_mode() -> async_if_possible. query_mode(_Config) -> async. --spec on_start(resource_id(), config()) -> {ok, state()} | {error, term()}. +-spec on_start(connector_resource_id(), connector_config()) -> + {ok, connector_state()} | {error, term()}. on_start(InstanceId, Config0) -> ?SLOG(info, #{ msg => "starting_gcp_pubsub_bridge", config => Config0 }), Config = maps:update_with(service_account_json, fun emqx_utils_maps:binary_key_map/1, Config0), - #{ - attributes_template := AttributesTemplate, - ordering_key_template := OrderingKeyTemplate, - payload_template := PayloadTemplate, - pubsub_topic := PubSubTopic, - service_account_json := #{<<"project_id">> := ProjectId} - } = Config, + #{service_account_json := #{<<"project_id">> := ProjectId}} = Config, case emqx_bridge_gcp_pubsub_client:start(InstanceId, Config) of {ok, Client} -> State = #{ client => Client, - attributes_template => preproc_attributes(AttributesTemplate), - ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), - payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), - project_id => ProjectId, - pubsub_topic => PubSubTopic + installed_actions => #{}, + project_id => ProjectId }, {ok, State}; Error -> Error end. --spec on_stop(resource_id(), state()) -> ok | {error, term()}. +-spec on_stop(connector_resource_id(), connector_state()) -> ok | {error, term()}. on_stop(InstanceId, _State) -> emqx_bridge_gcp_pubsub_client:stop(InstanceId). --spec on_get_status(resource_id(), state()) -> connected | disconnected. +-spec on_get_status(connector_resource_id(), connector_state()) -> connected | disconnected. on_get_status(_InstanceId, #{client := Client} = _State) -> emqx_bridge_gcp_pubsub_client:get_status(Client). -spec on_query( - resource_id(), - {send_message, map()}, - state() + connector_resource_id(), + {message_tag(), map()}, + connector_state() ) -> {ok, map()} | {error, {recoverable_error, term()}} | {error, term()}. -on_query(ResourceId, {send_message, Selected}, State) -> - Requests = [{send_message, Selected}], +on_query(ResourceId, {MessageTag, Selected}, State) -> + Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_SYNC", "gcp_pubsub_received", @@ -109,24 +112,25 @@ on_query(ResourceId, {send_message, Selected}, State) -> do_send_requests_sync(State, Requests, ResourceId). -spec on_query_async( - resource_id(), - {send_message, map()}, + connector_resource_id(), + {message_tag(), map()}, {ReplyFun :: function(), Args :: list()}, - state() + connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. -on_query_async(ResourceId, {send_message, Selected}, ReplyFunAndArgs, State) -> - Requests = [{send_message, Selected}], +on_query_async(ResourceId, {MessageTag, Selected}, ReplyFunAndArgs, State) -> + Requests = [{MessageTag, Selected}], ?TRACE( "QUERY_ASYNC", "gcp_pubsub_received", #{requests => Requests, connector => ResourceId, state => State} ), + ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), do_send_requests_async(State, Requests, ReplyFunAndArgs). -spec on_batch_query( - resource_id(), - [{send_message, map()}], - state() + connector_resource_id(), + [{message_tag(), map()}], + connector_state() ) -> {ok, map()} | {error, {recoverable_error, term()}} @@ -140,10 +144,10 @@ on_batch_query(ResourceId, Requests, State) -> do_send_requests_sync(State, Requests, ResourceId). -spec on_batch_query_async( - resource_id(), - [{send_message, map()}], + connector_resource_id(), + [{message_tag(), map()}], {ReplyFun :: function(), Args :: list()}, - state() + connector_state() ) -> {ok, pid()} | {error, no_pool_worker_available}. on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> ?TRACE( @@ -151,32 +155,92 @@ on_batch_query_async(ResourceId, Requests, ReplyFunAndArgs, State) -> "gcp_pubsub_received", #{requests => Requests, connector => ResourceId, state => State} ), + ?tp(gcp_pubsub_producer_async, #{instance_id => ResourceId, requests => Requests}), do_send_requests_async(State, Requests, ReplyFunAndArgs). +-spec on_add_channel( + connector_resource_id(), + connector_state(), + action_resource_id(), + action_config() +) -> + {ok, connector_state()}. +on_add_channel(_ConnectorResId, ConnectorState0, ActionId, ActionConfig) -> + #{installed_actions := InstalledActions0} = ConnectorState0, + ChannelState = install_channel(ActionConfig), + InstalledActions = InstalledActions0#{ActionId => ChannelState}, + ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, + {ok, ConnectorState}. + +-spec on_remove_channel( + connector_resource_id(), + connector_state(), + action_resource_id() +) -> + {ok, connector_state()}. +on_remove_channel(_ConnectorResId, ConnectorState0, ActionId) -> + #{installed_actions := InstalledActions0} = ConnectorState0, + InstalledActions = maps:remove(ActionId, InstalledActions0), + ConnectorState = ConnectorState0#{installed_actions := InstalledActions}, + {ok, ConnectorState}. + +-spec on_get_channels(connector_resource_id()) -> + [{action_resource_id(), action_config()}]. +on_get_channels(ConnectorResId) -> + emqx_bridge_v2:get_channels_for_connector(ConnectorResId). + +-spec on_get_channel_status(connector_resource_id(), action_resource_id(), connector_state()) -> + health_check_status(). +on_get_channel_status(_ConnectorResId, _ChannelId, _ConnectorState) -> + %% Should we check the underlying client? Same as on_get_status? + ?status_connected. + %%------------------------------------------------------------------------------------------------- %% Helper fns %%------------------------------------------------------------------------------------------------- +%% TODO: check if topic exists ("unhealthy target") +install_channel(ActionConfig) -> + #{ + parameters := #{ + attributes_template := AttributesTemplate, + ordering_key_template := OrderingKeyTemplate, + payload_template := PayloadTemplate, + pubsub_topic := PubSubTopic + } + } = ActionConfig, + #{ + attributes_template => preproc_attributes(AttributesTemplate), + ordering_key_template => emqx_placeholder:preproc_tmpl(OrderingKeyTemplate), + payload_template => emqx_placeholder:preproc_tmpl(PayloadTemplate), + pubsub_topic => PubSubTopic + }. + -spec do_send_requests_sync( - state(), - [{send_message, map()}], + connector_state(), + [{message_tag(), map()}], resource_id() ) -> {ok, status_code(), headers()} | {ok, status_code(), headers(), body()} | {error, {recoverable_error, term()}} | {error, term()}. -do_send_requests_sync(State, Requests, InstanceId) -> - #{client := Client} = State, +do_send_requests_sync(ConnectorState, Requests, InstanceId) -> + ?tp(gcp_pubsub_producer_sync, #{instance_id => InstanceId, requests => Requests}), + #{client := Client} = ConnectorState, + %% is it safe to assume the tag is the same??? And not empty??? + [{MessageTag, _} | _] = Requests, + #{installed_actions := InstalledActions} = ConnectorState, + ChannelState = maps:get(MessageTag, InstalledActions), Payloads = lists:map( - fun({send_message, Selected}) -> - encode_payload(State, Selected) + fun({_MessageTag, Selected}) -> + encode_payload(ChannelState, Selected) end, Requests ), Body = to_pubsub_request(Payloads), - Path = publish_path(State), + Path = publish_path(ConnectorState, ChannelState), Method = post, Request = {prepared_request, {Method, Path, Body}}, Result = emqx_bridge_gcp_pubsub_client:query_sync(Request, Client), @@ -184,21 +248,25 @@ do_send_requests_sync(State, Requests, InstanceId) -> handle_result(Result, Request, QueryMode, InstanceId). -spec do_send_requests_async( - state(), - [{send_message, map()}], + connector_state(), + [{message_tag(), map()}], {ReplyFun :: function(), Args :: list()} ) -> {ok, pid()} | {error, no_pool_worker_available}. -do_send_requests_async(State, Requests, ReplyFunAndArgs0) -> - #{client := Client} = State, +do_send_requests_async(ConnectorState, Requests, ReplyFunAndArgs0) -> + #{client := Client} = ConnectorState, + %% is it safe to assume the tag is the same??? And not empty??? + [{MessageTag, _} | _] = Requests, + #{installed_actions := InstalledActions} = ConnectorState, + ChannelState = maps:get(MessageTag, InstalledActions), Payloads = lists:map( - fun({send_message, Selected}) -> - encode_payload(State, Selected) + fun({_MessageTag, Selected}) -> + encode_payload(ChannelState, Selected) end, Requests ), Body = to_pubsub_request(Payloads), - Path = publish_path(State), + Path = publish_path(ConnectorState, ChannelState), Method = post, Request = {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs = {fun ?MODULE:reply_delegator/2, [ReplyFunAndArgs0]}, @@ -206,18 +274,18 @@ do_send_requests_async(State, Requests, ReplyFunAndArgs0) -> Request, ReplyFunAndArgs, Client ). --spec encode_payload(state(), Selected :: map()) -> +-spec encode_payload(action_state(), Selected :: map()) -> #{ data := binary(), attributes => #{binary() => binary()}, 'orderingKey' => binary() }. -encode_payload(State, Selected) -> +encode_payload(ActionState, Selected) -> #{ attributes_template := AttributesTemplate, ordering_key_template := OrderingKeyTemplate, payload_template := PayloadTemplate - } = State, + } = ActionState, Data = render_payload(PayloadTemplate, Selected), OrderingKey = render_key(OrderingKeyTemplate, Selected), Attributes = proc_attributes(AttributesTemplate, Selected), @@ -307,13 +375,8 @@ proc_attributes(AttributesTemplate, Selected) -> to_pubsub_request(Payloads) -> emqx_utils_json:encode(#{messages => Payloads}). --spec publish_path(state()) -> binary(). -publish_path( - _State = #{ - project_id := ProjectId, - pubsub_topic := PubSubTopic - } -) -> +-spec publish_path(connector_state(), action_state()) -> binary(). +publish_path(#{project_id := ProjectId}, #{pubsub_topic := PubSubTopic}) -> <<"/v1/projects/", ProjectId/binary, "/topics/", PubSubTopic/binary, ":publish">>. handle_result({error, Reason}, _Request, QueryMode, ResourceId) when diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl new file mode 100644 index 000000000..6b5391b09 --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_action_info.erl @@ -0,0 +1,46 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_producer_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0, + bridge_v1_config_to_action_config/2 +]). + +bridge_v1_type_name() -> gcp_pubsub. + +action_type_name() -> gcp_pubsub_producer. + +connector_type_name() -> gcp_pubsub_producer. + +schema_module() -> emqx_bridge_gcp_pubsub_producer_schema. + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + CommonActionKeys = emqx_bridge_v2_schema:top_level_common_action_keys(), + ParamsKeys = producer_action_parameters_field_keys(), + Config1 = maps:with(CommonActionKeys, BridgeV1Config), + Params = maps:with(ParamsKeys, BridgeV1Config), + Config1#{ + <<"connector">> => ConnectorName, + <<"parameters">> => Params + }. + +%%------------------------------------------------------------------------------------------ +%% Internal helper fns +%%------------------------------------------------------------------------------------------ + +producer_action_parameters_field_keys() -> + [ + to_bin(K) + || {K, _} <- emqx_bridge_gcp_pubsub_producer_schema:fields(action_parameters) + ]. + +to_bin(L) when is_list(L) -> list_to_binary(L); +to_bin(A) when is_atom(A) -> atom_to_binary(A, utf8). diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl new file mode 100644 index 000000000..11ca16e0b --- /dev/null +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_producer_schema.erl @@ -0,0 +1,223 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_gcp_pubsub_producer_schema). + +-import(hoconsc, [mk/2, ref/2]). + +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% `hocon_schema' API +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% `emqx_bridge_v2_schema' "unofficial" API +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + +%%------------------------------------------------------------------------------------------------- +%% `hocon_schema' API +%%------------------------------------------------------------------------------------------------- + +namespace() -> + "gcp_pubsub_producer". + +roots() -> + []. + +%%========================================= +%% Action fields +%%========================================= +fields(action) -> + {gcp_pubsub_producer, + mk( + hoconsc:map(name, ref(?MODULE, producer_action)), + #{ + desc => <<"GCP PubSub Producer Action Config">>, + required => false + } + )}; +fields(producer_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk( + ref(?MODULE, action_parameters), + #{ + required => true, + desc => ?DESC(producer_action) + } + ) + ); +fields(action_parameters) -> + UnsupportedFields = [local_topic], + lists:filter( + fun({Key, _Schema}) -> not lists:member(Key, UnsupportedFields) end, + emqx_bridge_gcp_pubsub:fields(producer) + ); +%%========================================= +%% Connector fields +%%========================================= +fields("config_connector") -> + %% FIXME + emqx_connector_schema:common_fields() ++ + emqx_bridge_gcp_pubsub:fields(connector_config) ++ + emqx_resource_schema:fields("resource_opts"); +%%========================================= +%% HTTP API fields +%%========================================= +fields("get_bridge_v2") -> + emqx_bridge_schema:status_fields() ++ fields("post_bridge_v2"); +fields("post_bridge_v2") -> + [type_field(), name_field() | fields("put_bridge_v2")]; +fields("put_bridge_v2") -> + fields(producer_action). + +desc("config_connector") -> + ?DESC("config_connector"); +desc(action_parameters) -> + ?DESC(action_parameters); +desc(producer_action) -> + ?DESC(producer_action); +desc(_Name) -> + undefined. + +type_field() -> + {type, mk(gcp_pubsub_producer, #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +%%------------------------------------------------------------------------------------------------- +%% `emqx_bridge_v2_schema' "unofficial" API +%%------------------------------------------------------------------------------------------------- + +bridge_v2_examples(Method) -> + [ + #{ + <<"gcp_pubsub_producer">> => #{ + summary => <<"GCP PubSub Producer Action">>, + value => action_example(Method) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"gcp_pubsub_producer">> => #{ + summary => <<"GCP PubSub Producer Connector">>, + value => connector_example(Method) + } + } + ]. + +conn_bridge_examples(Method) -> + emqx_bridge_gcp_pubsub:conn_bridge_examples(Method). + +action_example(post) -> + maps:merge( + action_example(put), + #{ + type => <<"gcp_pubsub_producer">>, + name => <<"my_action">> + } + ); +action_example(get) -> + maps:merge( + action_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +action_example(put) -> + #{ + enable => true, + connector => <<"my_connector_name">>, + description => <<"My action">>, + local_topic => <<"local/topic">>, + resource_opts => + #{batch_size => 5}, + parameters => + #{ + pubsub_topic => <<"mytopic">>, + ordering_key_template => <<"${payload.ok}">>, + payload_template => <<"${payload}">>, + attributes_template => + [ + #{ + key => <<"${payload.attrs.k}">>, + value => <<"${payload.attrs.v}">> + } + ] + } + }. + +connector_example(get) -> + maps:merge( + connector_example(put), + #{ + status => <<"connected">>, + node_status => [ + #{ + node => <<"emqx@localhost">>, + status => <<"connected">> + } + ] + } + ); +connector_example(post) -> + maps:merge( + connector_example(put), + #{ + type => <<"gcp_pubsub_producer">>, + name => <<"my_connector">> + } + ); +connector_example(put) -> + #{ + enable => true, + connect_timeout => <<"10s">>, + pool_size => 8, + pipelining => 100, + max_retries => 2, + resource_opts => #{request_ttl => <<"60s">>}, + service_account_json => + #{ + auth_provider_x509_cert_url => + <<"https://www.googleapis.com/oauth2/v1/certs">>, + auth_uri => + <<"https://accounts.google.com/o/oauth2/auth">>, + client_email => + <<"test@myproject.iam.gserviceaccount.com">>, + client_id => <<"123812831923812319190">>, + client_x509_cert_url => + << + "https://www.googleapis.com/robot/v1/" + "metadata/x509/test%40myproject.iam.gserviceaccount.com" + >>, + private_key => + << + "-----BEGIN PRIVATE KEY-----\n" + "MIIEvQI..." + >>, + private_key_id => <<"kid">>, + project_id => <<"myproject">>, + token_uri => + <<"https://oauth2.googleapis.com/token">>, + type => <<"service_account">> + } + }. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl index acfe3df8b..f65b80f90 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_producer_SUITE.erl @@ -13,8 +13,12 @@ -include_lib("jose/include/jose_jwt.hrl"). -include_lib("jose/include/jose_jws.hrl"). --define(BRIDGE_TYPE, gcp_pubsub). --define(BRIDGE_TYPE_BIN, <<"gcp_pubsub">>). +-define(ACTION_TYPE, gcp_pubsub_producer). +-define(ACTION_TYPE_BIN, <<"gcp_pubsub_producer">>). +-define(CONNECTOR_TYPE, gcp_pubsub_producer). +-define(CONNECTOR_TYPE_BIN, <<"gcp_pubsub_producer">>). +-define(BRIDGE_V1_TYPE, gcp_pubsub). +-define(BRIDGE_V1_TYPE_BIN, <<"gcp_pubsub">>). -import(emqx_common_test_helpers, [on_exit/1]). @@ -141,19 +145,24 @@ end_per_testcase(_TestCase, _Config) -> generate_config(Config0) -> #{ - name := Name, + name := ActionName, config_string := ConfigString, pubsub_config := PubSubConfig, service_account_json := ServiceAccountJSON } = gcp_pubsub_config(Config0), - ResourceId = emqx_bridge_resource:resource_id(?BRIDGE_TYPE_BIN, Name), - BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, Name), + %% FIXME + %% `emqx_bridge_resource:resource_id' requires an existing connector in the config..... + ConnectorName = <<"connector_", ActionName/binary>>, + ConnectorResourceId = <<"connector:", ?CONNECTOR_TYPE_BIN/binary, ":", ConnectorName/binary>>, + ActionResourceId = emqx_bridge_v2:id(?ACTION_TYPE_BIN, ActionName, ConnectorName), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, ActionName), [ - {gcp_pubsub_name, Name}, + {gcp_pubsub_name, ActionName}, {gcp_pubsub_config, PubSubConfig}, {gcp_pubsub_config_string, ConfigString}, {service_account_json, ServiceAccountJSON}, - {resource_id, ResourceId}, + {connector_resource_id, ConnectorResourceId}, + {action_resource_id, ActionResourceId}, {bridge_id, BridgeId} | Config0 ]. @@ -168,7 +177,7 @@ delete_all_bridges() -> ). delete_bridge(Config) -> - Type = ?BRIDGE_TYPE, + Type = ?BRIDGE_V1_TYPE, Name = ?config(gcp_pubsub_name, Config), ct:pal("deleting bridge ~p", [{Type, Name}]), emqx_bridge:remove(Type, Name). @@ -177,7 +186,7 @@ create_bridge(Config) -> create_bridge(Config, _GCPPubSubConfigOverrides = #{}). create_bridge(Config, GCPPubSubConfigOverrides) -> - TypeBin = ?BRIDGE_TYPE_BIN, + TypeBin = ?BRIDGE_V1_TYPE_BIN, Name = ?config(gcp_pubsub_name, Config), GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides), @@ -190,7 +199,7 @@ create_bridge_http(Config) -> create_bridge_http(Config, _GCPPubSubConfigOverrides = #{}). create_bridge_http(Config, GCPPubSubConfigOverrides) -> - TypeBin = ?BRIDGE_TYPE_BIN, + TypeBin = ?BRIDGE_V1_TYPE_BIN, Name = ?config(gcp_pubsub_name, Config), GCPPubSubConfig0 = ?config(gcp_pubsub_config, Config), GCPPubSubConfig = emqx_utils_maps:deep_merge(GCPPubSubConfig0, GCPPubSubConfigOverrides), @@ -225,7 +234,7 @@ create_bridge_http(Config, GCPPubSubConfigOverrides) -> create_rule_and_action_http(Config) -> GCPPubSubName = ?config(gcp_pubsub_name, Config), - BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_TYPE_BIN, GCPPubSubName), + BridgeId = emqx_bridge_resource:bridge_id(?BRIDGE_V1_TYPE_BIN, GCPPubSubName), Params = #{ enable => true, sql => <<"SELECT * FROM \"t/topic\"">>, @@ -382,9 +391,14 @@ assert_metrics(ExpectedMetrics, ResourceId) -> CurrentMetrics = current_metrics(ResourceId), TelemetryTable = get(telemetry_table), RecordedEvents = ets:tab2list(TelemetryTable), - ?assertEqual(ExpectedMetrics, Metrics, #{ - current_metrics => CurrentMetrics, recorded_events => RecordedEvents - }), + ?retry( + _Sleep0 = 300, + _Attempts = 20, + ?assertEqual(ExpectedMetrics, Metrics, #{ + current_metrics => CurrentMetrics, + recorded_events => RecordedEvents + }) + ), ok. assert_empty_metrics(ResourceId) -> @@ -535,8 +549,30 @@ install_telemetry_handler(TestCase) -> end), Tid. +mk_res_id_filter(ResourceId) -> + fun(Event) -> + case Event of + #{metadata := #{resource_id := ResId}} when ResId =:= ResourceId -> + true; + _ -> + false + end + end. + wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> - Events = receive_all_events(GaugeName, Timeout), + wait_until_gauge_is(#{ + gauge_name => GaugeName, + expected => ExpectedValue, + timeout => Timeout + }). + +wait_until_gauge_is(#{} = Opts) -> + GaugeName = maps:get(gauge_name, Opts), + ExpectedValue = maps:get(expected, Opts), + Timeout = maps:get(timeout, Opts), + MaxEvents = maps:get(max_events, Opts, 10), + FilterFn = maps:get(filter_fn, Opts, fun(_Event) -> true end), + Events = receive_all_events(GaugeName, Timeout, MaxEvents, FilterFn), case length(Events) > 0 andalso lists:last(Events) of #{measurements := #{gauge_set := ExpectedValue}} -> ok; @@ -550,15 +586,36 @@ wait_until_gauge_is(GaugeName, ExpectedValue, Timeout) -> ct:pal("no ~p gauge events received!", [GaugeName]) end. -receive_all_events(EventName, Timeout) -> - receive_all_events(EventName, Timeout, _MaxEvents = 10, _Count = 0, _Acc = []). +receive_all_events(EventName, Timeout, MaxEvents, FilterFn) -> + receive_all_events(EventName, Timeout, MaxEvents, FilterFn, _Count = 0, _Acc = []). -receive_all_events(_EventName, _Timeout, MaxEvents, Count, Acc) when Count >= MaxEvents -> +receive_all_events(_EventName, _Timeout, MaxEvents, _FilterFn, Count, Acc) when + Count >= MaxEvents +-> lists:reverse(Acc); -receive_all_events(EventName, Timeout, MaxEvents, Count, Acc) -> +receive_all_events(EventName, Timeout, MaxEvents, FilterFn, Count, Acc) -> receive {telemetry, #{name := [_, _, EventName]} = Event} -> - receive_all_events(EventName, Timeout, MaxEvents, Count + 1, [Event | Acc]) + case FilterFn(Event) of + true -> + receive_all_events( + EventName, + Timeout, + MaxEvents, + FilterFn, + Count + 1, + [Event | Acc] + ); + false -> + receive_all_events( + EventName, + Timeout, + MaxEvents, + FilterFn, + Count, + Acc + ) + end after Timeout -> lists:reverse(Acc) end. @@ -597,14 +654,14 @@ wait_n_events(TelemetryTable, ResourceId, NEvents, Timeout, EventName) -> %%------------------------------------------------------------------------------ t_publish_success(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, ?assertMatch({ok, _}, create_bridge(Config)), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), Payload = <<"payload">>, Message = emqx_message:make(Topic, Payload), emqx:publish(Message), @@ -620,7 +677,7 @@ t_publish_success(Config) -> DecodedMessages ), %% to avoid test flakiness - wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_telemetry_event(TelemetryTable, success, ActionResourceId), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -633,7 +690,7 @@ t_publish_success(Config) -> retried => 0, success => 1 }, - ResourceId + ActionResourceId ), ok. @@ -662,12 +719,12 @@ t_publish_success_infinity_timeout(Config) -> ok. t_publish_success_local_topic(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), LocalTopic = <<"local/topic">>, {ok, _} = create_bridge(Config, #{<<"local_topic">> => LocalTopic}), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), Payload = <<"payload">>, Message = emqx_message:make(LocalTopic, Payload), emqx:publish(Message), @@ -682,7 +739,7 @@ t_publish_success_local_topic(Config) -> DecodedMessages ), %% to avoid test flakiness - wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_telemetry_event(TelemetryTable, success, ActionResourceId), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -695,7 +752,7 @@ t_publish_success_local_topic(Config) -> retried => 0, success => 1 }, - ResourceId + ActionResourceId ), ok. @@ -704,7 +761,7 @@ t_create_via_http(Config) -> ok. t_publish_templated(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, @@ -721,7 +778,7 @@ t_publish_templated(Config) -> ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), Payload = <<"payload">>, Message = emqx_message:set_header( @@ -747,7 +804,7 @@ t_publish_templated(Config) -> DecodedMessages ), %% to avoid test flakiness - wait_telemetry_event(TelemetryTable, success, ResourceId), + wait_telemetry_event(TelemetryTable, success, ActionResourceId), wait_until_gauge_is(queuing, 0, 500), wait_until_gauge_is(inflight, 0, 500), assert_metrics( @@ -760,7 +817,7 @@ t_publish_templated(Config) -> retried => 0, success => 1 }, - ResourceId + ActionResourceId ), ok. @@ -774,7 +831,7 @@ t_publish_success_batch(Config) -> end. test_publish_success_batch(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), ServiceAccountJSON = ?config(service_account_json, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, @@ -796,7 +853,7 @@ test_publish_success_batch(Config) -> ), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), NumMessages = BatchSize * 2, Messages = [emqx_message:make(Topic, integer_to_binary(N)) || N <- lists:seq(1, NumMessages)], %% publish in parallel to avoid each client blocking and then @@ -822,7 +879,7 @@ test_publish_success_batch(Config) -> wait_telemetry_event( TelemetryTable, success, - ResourceId, + ActionResourceId, #{timeout => 15_000, n_events => NumMessages} ), wait_until_gauge_is(queuing, 0, _Timeout = 400), @@ -837,7 +894,7 @@ test_publish_success_batch(Config) -> retried => 0, success => NumMessages }, - ResourceId + ActionResourceId ), ok. @@ -1045,7 +1102,7 @@ t_jose_other_error(Config) -> fun(Res, Trace) -> ?assertMatch({ok, _}, Res), ?assertMatch( - [#{error := {invalid_private_key, {unknown, error}}}], + [#{error := {invalid_private_key, {unknown, error}}} | _], ?of_kind(gcp_pubsub_connector_startup_error, Trace) ), ok @@ -1054,7 +1111,7 @@ t_jose_other_error(Config) -> ok. t_publish_econnrefused(Config) -> - ResourceId = ?config(resource_id, Config), + ResourceId = ?config(connector_resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. {ok, _} = create_bridge( @@ -1071,7 +1128,7 @@ t_publish_econnrefused(Config) -> do_econnrefused_or_timeout_test(Config, econnrefused). t_publish_timeout(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), %% set pipelining to 1 so that one of the 2 requests is `pending' %% in ehttpc. also, we set the batch size to 1 to also ensure the %% requests are done separately. @@ -1079,12 +1136,13 @@ t_publish_timeout(Config) -> <<"pipelining">> => 1, <<"resource_opts">> => #{ <<"batch_size">> => 1, - <<"resume_interval">> => <<"1s">> + <<"resume_interval">> => <<"1s">>, + <<"metrics_flush_interval">> => <<"700ms">> } }), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), TestPid = self(), TimeoutHandler = fun(Req0, State) -> @@ -1107,7 +1165,8 @@ t_publish_timeout(Config) -> do_econnrefused_or_timeout_test(Config, timeout). do_econnrefused_or_timeout_test(Config, Error) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), + ConnectorResourceId = ?config(connector_resource_id, Config), TelemetryTable = ?config(telemetry_table, Config), Topic = <<"t/topic">>, Payload = <<"payload">>, @@ -1156,9 +1215,9 @@ do_econnrefused_or_timeout_test(Config, Error) -> case Error of econnrefused -> case ?of_kind(gcp_pubsub_request_failed, Trace) of - [#{reason := Error, connector := ResourceId} | _] -> + [#{reason := Error, connector := ConnectorResourceId} | _] -> ok; - [#{reason := {closed, _Msg}, connector := ResourceId} | _] -> + [#{reason := {closed, _Msg}, connector := ConnectorResourceId} | _] -> %% _Msg = "The connection was lost." ok; Trace0 -> @@ -1182,7 +1241,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> %% even waiting, hard to avoid flakiness... simpler to just sleep %% a bit until stabilization. ct:sleep(200), - CurrentMetrics = current_metrics(ResourceId), + CurrentMetrics = current_metrics(ActionResourceId), RecordedEvents = ets:tab2list(TelemetryTable), ct:pal("telemetry events: ~p", [RecordedEvents]), ?assertMatch( @@ -1198,7 +1257,19 @@ do_econnrefused_or_timeout_test(Config, Error) -> CurrentMetrics ); timeout -> - wait_until_gauge_is(inflight, 0, _Timeout = 1_000), + wait_telemetry_event( + TelemetryTable, + late_reply, + ActionResourceId, + #{timeout => 5_000, n_events => 2} + ), + wait_until_gauge_is(#{ + gauge_name => inflight, + expected => 0, + filter_fn => mk_res_id_filter(ActionResourceId), + timeout => 1_000, + max_events => 20 + }), wait_until_gauge_is(queuing, 0, _Timeout = 1_000), assert_metrics( #{ @@ -1211,7 +1282,7 @@ do_econnrefused_or_timeout_test(Config, Error) -> success => 0, late_reply => 2 }, - ResourceId + ActionResourceId ) end, @@ -1334,7 +1405,8 @@ t_failure_no_body(Config) -> ok. t_unrecoverable_error(Config) -> - ResourceId = ?config(resource_id, Config), + ActionResourceId = ?config(action_resource_id, Config), + TelemetryTable = ?config(telemetry_table, Config), TestPid = self(), FailureNoBodyHandler = fun(Req0, State) -> @@ -1358,7 +1430,7 @@ t_unrecoverable_error(Config) -> ok = emqx_bridge_http_connector_test_server:set_handler(FailureNoBodyHandler), Topic = <<"t/topic">>, {ok, _} = create_bridge(Config), - assert_empty_metrics(ResourceId), + assert_empty_metrics(ActionResourceId), {ok, #{<<"id">> := RuleId}} = create_rule_and_action_http(Config), on_exit(fun() -> ok = emqx_rule_engine:delete_rule(RuleId) end), Payload = <<"payload">>, @@ -1386,6 +1458,7 @@ t_unrecoverable_error(Config) -> %% removed, this inflight should be 1, because we retry if %% the worker is killed. wait_until_gauge_is(inflight, 0, _Timeout = 400), + wait_telemetry_event(TelemetryTable, failed, ActionResourceId), assert_metrics( #{ dropped => 0, @@ -1398,7 +1471,7 @@ t_unrecoverable_error(Config) -> retried => 0, success => 0 }, - ResourceId + ActionResourceId ), ok. @@ -1407,7 +1480,7 @@ t_stop(Config) -> {ok, _} = create_bridge(Config), ?check_trace( ?wait_async_action( - emqx_bridge_resource:stop(?BRIDGE_TYPE, Name), + emqx_bridge_resource:stop(?BRIDGE_V1_TYPE, Name), #{?snk_kind := gcp_pubsub_stop}, 5_000 ), @@ -1421,13 +1494,13 @@ t_stop(Config) -> ok. t_get_status_ok(Config) -> - ResourceId = ?config(resource_id, Config), + ResourceId = ?config(connector_resource_id, Config), {ok, _} = create_bridge(Config), ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), ok. t_get_status_no_worker(Config) -> - ResourceId = ?config(resource_id, Config), + ResourceId = ?config(connector_resource_id, Config), {ok, _} = create_bridge(Config), emqx_common_test_helpers:with_mock( ehttpc, @@ -1441,7 +1514,7 @@ t_get_status_no_worker(Config) -> ok. t_get_status_down(Config) -> - ResourceId = ?config(resource_id, Config), + ResourceId = ?config(connector_resource_id, Config), {ok, _} = create_bridge(Config), emqx_common_test_helpers:with_mock( ehttpc, @@ -1457,7 +1530,7 @@ t_get_status_down(Config) -> ok. t_get_status_timeout_calling_workers(Config) -> - ResourceId = ?config(resource_id, Config), + ResourceId = ?config(connector_resource_id, Config), {ok, _} = create_bridge(Config), emqx_common_test_helpers:with_mock( ehttpc, @@ -1520,7 +1593,7 @@ t_on_start_ehttpc_pool_start_failure(Config) -> ), fun(Trace) -> ?assertMatch( - [#{reason := some_error}], + [#{reason := some_error} | _], ?of_kind(gcp_pubsub_ehttpc_pool_start_failure, Trace) ), ok @@ -1668,7 +1741,7 @@ t_attributes(Config) -> ), %% ensure loading cluster override file doesn't mangle the attribute %% placeholders... - #{<<"bridges">> := #{?BRIDGE_TYPE_BIN := #{Name := RawConf}}} = + #{<<"actions">> := #{?ACTION_TYPE_BIN := #{Name := RawConf}}} = emqx_config:read_override_conf(#{override_to => cluster}), ?assertEqual( [ @@ -1689,7 +1762,7 @@ t_attributes(Config) -> <<"value">> => <<"${.payload.value}">> } ], - maps:get(<<"attributes_template">>, RawConf) + emqx_utils_maps:deep_get([<<"parameters">>, <<"attributes_template">>], RawConf) ), ok end, diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 1be7cc6ed..27b068461 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -25,6 +25,8 @@ resource_type(azure_event_hub_producer) -> emqx_bridge_kafka_impl_producer; resource_type(confluent_producer) -> emqx_bridge_kafka_impl_producer; +resource_type(gcp_pubsub_producer) -> + emqx_bridge_gcp_pubsub_impl_producer; resource_type(kafka_producer) -> emqx_bridge_kafka_impl_producer; resource_type(syskeeper_forwarder) -> @@ -65,6 +67,14 @@ connector_structs() -> required => false } )}, + {gcp_pubsub_producer, + mk( + hoconsc:map(name, ref(emqx_bridge_gcp_pubsub_producer_schema, "config_connector")), + #{ + desc => <<"GCP PubSub Producer Connector Config">>, + required => false + } + )}, {kafka_producer, mk( hoconsc:map(name, ref(emqx_bridge_kafka, "config_connector")), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index 8397f1bba..6382d2bcd 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -24,7 +24,8 @@ -export([ transform_bridges_v1_to_connectors_and_bridges_v2/1, - transform_bridge_v1_config_to_action_config/4 + transform_bridge_v1_config_to_action_config/4, + top_level_common_connector_keys/0 ]). -export([roots/0, fields/1, desc/1, namespace/0, tags/0]). @@ -32,6 +33,7 @@ -export([get_response/0, put_request/0, post_request/0]). -export([connector_type_to_bridge_types/1]). +-export([common_fields/0]). -if(?EMQX_RELEASE_EDITION == ee). enterprise_api_schemas(Method) -> @@ -64,6 +66,7 @@ enterprise_fields_connectors() -> []. connector_type_to_bridge_types(azure_event_hub_producer) -> [azure_event_hub_producer]; connector_type_to_bridge_types(confluent_producer) -> [confluent_producer]; +connector_type_to_bridge_types(gcp_pubsub_producer) -> [gcp_pubsub_producer]; connector_type_to_bridge_types(kafka_producer) -> [kafka, kafka_producer]; connector_type_to_bridge_types(syskeeper_forwarder) -> [syskeeper_forwarder]; connector_type_to_bridge_types(syskeeper_proxy) -> []. @@ -159,17 +162,20 @@ transform_bridge_v1_config_to_action_config( BridgeV1Conf, ConnectorName, ConnectorFields ). -transform_bridge_v1_config_to_action_config( - BridgeV1Conf, ConnectorName, ConnectorFields -) -> - TopKeys = [ +top_level_common_connector_keys() -> + [ <<"enable">>, <<"connector">>, <<"local_topic">>, <<"resource_opts">>, <<"description">>, <<"parameters">> - ], + ]. + +transform_bridge_v1_config_to_action_config( + BridgeV1Conf, ConnectorName, ConnectorFields +) -> + TopKeys = top_level_common_connector_keys(), TopKeysMap = maps:from_keys(TopKeys, true), %% Remove connector fields ActionMap0 = lists:foldl( @@ -352,6 +358,12 @@ desc(connectors) -> desc(_) -> undefined. +common_fields() -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {description, emqx_schema:description_schema()} + ]. + %%====================================================================================== %% Helper Functions %%====================================================================================== diff --git a/apps/emqx_resource/include/emqx_resource.hrl b/apps/emqx_resource/include/emqx_resource.hrl index b34da9a63..7a8fdedef 100644 --- a/apps/emqx_resource/include/emqx_resource.hrl +++ b/apps/emqx_resource/include/emqx_resource.hrl @@ -111,6 +111,10 @@ | {error, {recoverable_error, term()}} | {error, term()}. +-type action_resource_id() :: resource_id(). +-type connector_resource_id() :: resource_id(). +-type message_tag() :: action_resource_id(). + -define(WORKER_POOL_SIZE, 16). -define(DEFAULT_BUFFER_BYTES, 256 * 1024 * 1024). diff --git a/rel/i18n/emqx_bridge_gcp_pubsub_producer_schema.hocon b/rel/i18n/emqx_bridge_gcp_pubsub_producer_schema.hocon new file mode 100644 index 000000000..a4ac2afaf --- /dev/null +++ b/rel/i18n/emqx_bridge_gcp_pubsub_producer_schema.hocon @@ -0,0 +1,18 @@ +emqx_bridge_gcp_pubsub_producer_schema { + + action_parameters.desc: + """Action specific configs.""" + action_parameters.label: + """Action""" + + producer_action.desc: + """Action configs.""" + producer_action.label: + """Action""" + + config_connector.desc: + """Configuration for a GCP PubSub Producer Client.""" + config_connector.label: + """GCP PubSub Producer Client Configuration""" + +} diff --git a/rel/i18n/emqx_bridge_v2_schema.hocon b/rel/i18n/emqx_bridge_v2_schema.hocon index 4543b8eb6..69f8a9109 100644 --- a/rel/i18n/emqx_bridge_v2_schema.hocon +++ b/rel/i18n/emqx_bridge_v2_schema.hocon @@ -6,4 +6,14 @@ desc_bridges_v2.desc: desc_bridges_v2.label: """Bridge Configuration""" +mqtt_topic.desc: +"""MQTT topic or topic filter as data source (action input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in the remote system.""" +mqtt_topic.label: +"""Source MQTT Topic""" + +config_enable.desc: +"""Enable (true) or disable (false) this action.""" +config_enable.label: +"""Enable or Disable""" + } diff --git a/rel/i18n/emqx_connector_schema.hocon b/rel/i18n/emqx_connector_schema.hocon index 8b9b2ccac..d3aa1c82b 100644 --- a/rel/i18n/emqx_connector_schema.hocon +++ b/rel/i18n/emqx_connector_schema.hocon @@ -2,15 +2,17 @@ emqx_connector_schema { desc_connectors.desc: """Connectors that are used to connect to external systems""" - desc_connectors.label: """Connectors""" - connector_field.desc: """Name of connector used to connect to the resource where the action is to be performed.""" - connector_field.label: """Connector""" +config_enable.desc: +"""Enable (true) or disable (false) this connector.""" +config_enable.label: +"""Enable or Disable""" + }