From 2ac2d4c037bedb835ce43105ca15015c7ccc03f2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 20 Jun 2023 11:15:13 -0300 Subject: [PATCH] refactor: addressing review comments --- apps/emqx_bridge_gcp_pubsub/rebar.config | 9 ++-- .../src/emqx_bridge_gcp_pubsub.erl | 2 +- .../src/emqx_bridge_gcp_pubsub_connector.erl | 49 +++++++++---------- .../emqx_bridge_gcp_pubsub_impl_consumer.erl | 4 +- .../emqx_bridge_gcp_pubsub_impl_producer.erl | 4 +- .../emqx_bridge_gcp_pubsub_consumer_SUITE.erl | 6 +-- 6 files changed, 37 insertions(+), 37 deletions(-) diff --git a/apps/emqx_bridge_gcp_pubsub/rebar.config b/apps/emqx_bridge_gcp_pubsub/rebar.config index be59db264..10b89a449 100644 --- a/apps/emqx_bridge_gcp_pubsub/rebar.config +++ b/apps/emqx_bridge_gcp_pubsub/rebar.config @@ -7,10 +7,11 @@ warnings_as_errors, debug_info ]}. -{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} - , {emqx_resource, {path, "../../apps/emqx_resource"}} - , {emqx_bridge, {path, "../../apps/emqx_bridge"}} - ]}. +{deps, [ + {emqx_connector, {path, "../../apps/emqx_connector"}}, + {emqx_resource, {path, "../../apps/emqx_resource"}}, + {emqx_bridge, {path, "../../apps/emqx_bridge"}} +]}. {xref_checks, [ undefined_function_calls, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl index d9d18ec48..81aa729e4 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub.erl @@ -152,7 +152,7 @@ fields(consumer) -> pos_integer(), #{default => 100, desc => ?DESC("consumer_pull_max_messages")} )}, - {pull_worker_multiplier, + {consumer_workers_per_topic, mk( pos_integer(), #{ diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl index 80bd50113..35a4a7797 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_connector.erl @@ -86,17 +86,7 @@ on_start( connector => ResourceId, config => Config }), - %% emulating the emulator behavior - %% https://cloud.google.com/pubsub/docs/emulator - {Transport, HostPort} = - case os:getenv("PUBSUB_EMULATOR_HOST") of - false -> - {tls, "pubsub.googleapis.com:443"}; - HostPort0 -> - %% The emulator is plain HTTP... - Transport0 = persistent_term:get({?MODULE, transport}, tcp), - {Transport0, HostPort0} - end, + {Transport, HostPort} = get_transport(), #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), PoolType = random, TransportOpts = @@ -302,7 +292,6 @@ get_jwt_authorization_header(JWTConfig) -> {ok, map()} | {error, {recoverable_error, term()} | term()}. do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) -> #{ - jwt_config := JWTConfig, pool_name := PoolName, max_retries := MaxRetries, request_ttl := RequestTTL @@ -315,12 +304,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI resource_id => ResourceId } ), - Headers = get_jwt_authorization_header(JWTConfig), - Request = - case {Method, Body} of - {get, <<>>} -> {Path, Headers}; - _ -> {Path, Headers, Body} - end, + Request = to_ehttpc_request(State, Method, Path, Body), Response = ehttpc:request( PoolName, Method, @@ -340,7 +324,6 @@ do_send_requests_async( State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId ) -> #{ - jwt_config := JWTConfig, pool_name := PoolName, request_ttl := RequestTTL } = State, @@ -352,12 +335,7 @@ do_send_requests_async( resource_id => ResourceId } ), - Headers = get_jwt_authorization_header(JWTConfig), - Request = - case {Method, Body} of - {get, <<>>} -> {Path, Headers}; - _ -> {Path, Headers, Body} - end, + Request = to_ehttpc_request(State, Method, Path, Body), Worker = ehttpc_pool:pick_worker(PoolName), ok = ehttpc:request_async( Worker, @@ -368,6 +346,14 @@ do_send_requests_async( ), {ok, Worker}. +to_ehttpc_request(State, Method, Path, Body) -> + #{jwt_config := JWTConfig} = State, + Headers = get_jwt_authorization_header(JWTConfig), + case {Method, Body} of + {get, <<>>} -> {Path, Headers}; + _ -> {Path, Headers, Body} + end. + -spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}. handle_response(Result, ResourceId, QueryMode) -> case Result of @@ -461,3 +447,16 @@ do_get_status(ResourceId, Timeout) -> exit:timeout -> false end. + +-spec get_transport() -> {tls | tcp, string()}. +get_transport() -> + %% emulating the emulator behavior + %% https://cloud.google.com/pubsub/docs/emulator + case os:getenv("PUBSUB_EMULATOR_HOST") of + false -> + {tls, "pubsub.googleapis.com:443"}; + HostPort0 -> + %% The emulator is plain HTTP... + Transport0 = persistent_term:get({?MODULE, transport}, tcp), + {Transport0, HostPort0} + end. diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl index 73220a807..901e38dc7 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_consumer.erl @@ -91,8 +91,8 @@ start_consumers(InstanceId, ConnectorState, Config) -> } = Config, ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), TopicMapping = maps:get(topic_mapping, ConsumerConfig1), - PullWorkerMultiplier = maps:get(pull_worker_multiplier, ConsumerConfig1), - PoolSize = map_size(TopicMapping) * PullWorkerMultiplier, + ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1), + PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic, ConsumerConfig = ConsumerConfig1#{ auto_reconnect => ?AUTO_RECONNECT_S, bridge_name => BridgeName, diff --git a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl index 6504ffb59..1f198af23 100644 --- a/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl +++ b/apps/emqx_bridge_gcp_pubsub/src/emqx_bridge_gcp_pubsub_impl_producer.erl @@ -278,8 +278,8 @@ reply_delegator(ReplyFunAndArgs, Response) -> Reason =:= {closed, "The connection was lost."}; Reason =:= timeout -> - Result1 = {error, {recoverable_error, Reason}}, - emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); + Result = {error, {recoverable_error, Reason}}, + emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result); _ -> emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response) end. diff --git a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl index 41b2a11b2..6d81b88d2 100644 --- a/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl +++ b/apps/emqx_bridge_gcp_pubsub/test/emqx_bridge_gcp_pubsub_consumer_SUITE.erl @@ -145,7 +145,7 @@ consumer_config(TestCase, Config) -> ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), MQTTQoS = proplists:get_value(mqtt_qos, Config, 0), - PullWorkerMultiplier = proplists:get_value(pull_worker_multiplier, Config, 1), + ConsumerWorkersPerTopic = proplists:get_value(consumer_workers_per_topic, Config, 1), DefaultTopicMapping = [ #{ pubsub_topic => ConsumerTopic, @@ -167,7 +167,7 @@ consumer_config(TestCase, Config) -> " consumer {\n" " ack_retry_interval = \"5s\"\n" " pull_max_messages = 10\n" - " pull_worker_multiplier = ~b\n" + " consumer_workers_per_topic = ~b\n" %% topic mapping "~s" " }\n" @@ -182,7 +182,7 @@ consumer_config(TestCase, Config) -> [ Name, ServiceAccountJSONStr, - PullWorkerMultiplier, + ConsumerWorkersPerTopic, TopicMappingStr ] ),