refactor: addressing review comments

This commit is contained in:
Thales Macedo Garitezi 2023-06-20 11:15:13 -03:00
parent 01d758e4c0
commit 2ac2d4c037
6 changed files with 37 additions and 37 deletions

View File

@ -7,10 +7,11 @@
warnings_as_errors, warnings_as_errors,
debug_info debug_info
]}. ]}.
{deps, [ {emqx_connector, {path, "../../apps/emqx_connector"}} {deps, [
, {emqx_resource, {path, "../../apps/emqx_resource"}} {emqx_connector, {path, "../../apps/emqx_connector"}},
, {emqx_bridge, {path, "../../apps/emqx_bridge"}} {emqx_resource, {path, "../../apps/emqx_resource"}},
]}. {emqx_bridge, {path, "../../apps/emqx_bridge"}}
]}.
{xref_checks, [ {xref_checks, [
undefined_function_calls, undefined_function_calls,

View File

@ -152,7 +152,7 @@ fields(consumer) ->
pos_integer(), pos_integer(),
#{default => 100, desc => ?DESC("consumer_pull_max_messages")} #{default => 100, desc => ?DESC("consumer_pull_max_messages")}
)}, )},
{pull_worker_multiplier, {consumer_workers_per_topic,
mk( mk(
pos_integer(), pos_integer(),
#{ #{

View File

@ -86,17 +86,7 @@ on_start(
connector => ResourceId, connector => ResourceId,
config => Config config => Config
}), }),
%% emulating the emulator behavior {Transport, HostPort} = get_transport(),
%% https://cloud.google.com/pubsub/docs/emulator
{Transport, HostPort} =
case os:getenv("PUBSUB_EMULATOR_HOST") of
false ->
{tls, "pubsub.googleapis.com:443"};
HostPort0 ->
%% The emulator is plain HTTP...
Transport0 = persistent_term:get({?MODULE, transport}, tcp),
{Transport0, HostPort0}
end,
#{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}), #{hostname := Host, port := Port} = emqx_schema:parse_server(HostPort, #{default_port => 443}),
PoolType = random, PoolType = random,
TransportOpts = TransportOpts =
@ -302,7 +292,6 @@ get_jwt_authorization_header(JWTConfig) ->
{ok, map()} | {error, {recoverable_error, term()} | term()}. {ok, map()} | {error, {recoverable_error, term()} | term()}.
do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) -> do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceId) ->
#{ #{
jwt_config := JWTConfig,
pool_name := PoolName, pool_name := PoolName,
max_retries := MaxRetries, max_retries := MaxRetries,
request_ttl := RequestTTL request_ttl := RequestTTL
@ -315,12 +304,7 @@ do_send_requests_sync(State, {prepared_request, {Method, Path, Body}}, ResourceI
resource_id => ResourceId resource_id => ResourceId
} }
), ),
Headers = get_jwt_authorization_header(JWTConfig), Request = to_ehttpc_request(State, Method, Path, Body),
Request =
case {Method, Body} of
{get, <<>>} -> {Path, Headers};
_ -> {Path, Headers, Body}
end,
Response = ehttpc:request( Response = ehttpc:request(
PoolName, PoolName,
Method, Method,
@ -340,7 +324,6 @@ do_send_requests_async(
State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId State, {prepared_request, {Method, Path, Body}}, ReplyFunAndArgs, ResourceId
) -> ) ->
#{ #{
jwt_config := JWTConfig,
pool_name := PoolName, pool_name := PoolName,
request_ttl := RequestTTL request_ttl := RequestTTL
} = State, } = State,
@ -352,12 +335,7 @@ do_send_requests_async(
resource_id => ResourceId resource_id => ResourceId
} }
), ),
Headers = get_jwt_authorization_header(JWTConfig), Request = to_ehttpc_request(State, Method, Path, Body),
Request =
case {Method, Body} of
{get, <<>>} -> {Path, Headers};
_ -> {Path, Headers, Body}
end,
Worker = ehttpc_pool:pick_worker(PoolName), Worker = ehttpc_pool:pick_worker(PoolName),
ok = ehttpc:request_async( ok = ehttpc:request_async(
Worker, Worker,
@ -368,6 +346,14 @@ do_send_requests_async(
), ),
{ok, Worker}. {ok, Worker}.
to_ehttpc_request(State, Method, Path, Body) ->
#{jwt_config := JWTConfig} = State,
Headers = get_jwt_authorization_header(JWTConfig),
case {Method, Body} of
{get, <<>>} -> {Path, Headers};
_ -> {Path, Headers, Body}
end.
-spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}. -spec handle_response(term(), resource_id(), sync | async) -> {ok, map()} | {error, term()}.
handle_response(Result, ResourceId, QueryMode) -> handle_response(Result, ResourceId, QueryMode) ->
case Result of case Result of
@ -461,3 +447,16 @@ do_get_status(ResourceId, Timeout) ->
exit:timeout -> exit:timeout ->
false false
end. end.
-spec get_transport() -> {tls | tcp, string()}.
get_transport() ->
%% emulating the emulator behavior
%% https://cloud.google.com/pubsub/docs/emulator
case os:getenv("PUBSUB_EMULATOR_HOST") of
false ->
{tls, "pubsub.googleapis.com:443"};
HostPort0 ->
%% The emulator is plain HTTP...
Transport0 = persistent_term:get({?MODULE, transport}, tcp),
{Transport0, HostPort0}
end.

View File

@ -91,8 +91,8 @@ start_consumers(InstanceId, ConnectorState, Config) ->
} = Config, } = Config,
ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0), ConsumerConfig1 = maps:update_with(topic_mapping, fun convert_topic_mapping/1, ConsumerConfig0),
TopicMapping = maps:get(topic_mapping, ConsumerConfig1), TopicMapping = maps:get(topic_mapping, ConsumerConfig1),
PullWorkerMultiplier = maps:get(pull_worker_multiplier, ConsumerConfig1), ConsumerWorkersPerTopic = maps:get(consumer_workers_per_topic, ConsumerConfig1),
PoolSize = map_size(TopicMapping) * PullWorkerMultiplier, PoolSize = map_size(TopicMapping) * ConsumerWorkersPerTopic,
ConsumerConfig = ConsumerConfig1#{ ConsumerConfig = ConsumerConfig1#{
auto_reconnect => ?AUTO_RECONNECT_S, auto_reconnect => ?AUTO_RECONNECT_S,
bridge_name => BridgeName, bridge_name => BridgeName,

View File

@ -278,8 +278,8 @@ reply_delegator(ReplyFunAndArgs, Response) ->
Reason =:= {closed, "The connection was lost."}; Reason =:= {closed, "The connection was lost."};
Reason =:= timeout Reason =:= timeout
-> ->
Result1 = {error, {recoverable_error, Reason}}, Result = {error, {recoverable_error, Reason}},
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result1); emqx_resource:apply_reply_fun(ReplyFunAndArgs, Result);
_ -> _ ->
emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response) emqx_resource:apply_reply_fun(ReplyFunAndArgs, Response)
end. end.

View File

@ -145,7 +145,7 @@ consumer_config(TestCase, Config) ->
ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON), ServiceAccountJSONStr = emqx_utils_json:encode(ServiceAccountJSON),
MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>), MQTTTopic = proplists:get_value(mqtt_topic, Config, <<"mqtt/topic/", UniqueNum/binary>>),
MQTTQoS = proplists:get_value(mqtt_qos, Config, 0), MQTTQoS = proplists:get_value(mqtt_qos, Config, 0),
PullWorkerMultiplier = proplists:get_value(pull_worker_multiplier, Config, 1), ConsumerWorkersPerTopic = proplists:get_value(consumer_workers_per_topic, Config, 1),
DefaultTopicMapping = [ DefaultTopicMapping = [
#{ #{
pubsub_topic => ConsumerTopic, pubsub_topic => ConsumerTopic,
@ -167,7 +167,7 @@ consumer_config(TestCase, Config) ->
" consumer {\n" " consumer {\n"
" ack_retry_interval = \"5s\"\n" " ack_retry_interval = \"5s\"\n"
" pull_max_messages = 10\n" " pull_max_messages = 10\n"
" pull_worker_multiplier = ~b\n" " consumer_workers_per_topic = ~b\n"
%% topic mapping %% topic mapping
"~s" "~s"
" }\n" " }\n"
@ -182,7 +182,7 @@ consumer_config(TestCase, Config) ->
[ [
Name, Name,
ServiceAccountJSONStr, ServiceAccountJSONStr,
PullWorkerMultiplier, ConsumerWorkersPerTopic,
TopicMappingStr TopicMappingStr
] ]
), ),