refactor: fix typo in variable name

Might confuse people to think it's related to `replayq`.
This commit is contained in:
Thales Macedo Garitezi 2022-11-24 15:16:24 -03:00
parent 061fe144b8
commit 34e9056779
3 changed files with 13 additions and 13 deletions

View File

@ -192,11 +192,11 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) ->
on_query_async( on_query_async(
_InstId, _InstId,
{send_message, Msg}, {send_message, Msg},
{ReplayFun, Args}, {ReplyFun, Args},
#{name := InstanceId} #{name := InstanceId}
) -> ) ->
?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}),
emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}).
on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) ->
AutoReconn = maps:get(auto_reconnect, Conf, true), AutoReconn = maps:get(auto_reconnect, Conf, true),

View File

@ -163,8 +163,8 @@ running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} =
running({call, From}, {query, Request, _Opts}, St) -> running({call, From}, {query, Request, _Opts}, St) ->
query_or_acc(From, Request, St); query_or_acc(From, Request, St);
running(cast, {query, Request, Opts}, St) -> running(cast, {query, Request, Opts}, St) ->
ReplayFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
query_or_acc(ReplayFun, Request, St); query_or_acc(ReplyFun, Request, St);
running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) ->
flush(St#{tref := undefined}); flush(St#{tref := undefined});
running(info, {flush, _Ref}, _St) -> running(info, {flush, _Ref}, _St) ->
@ -191,11 +191,11 @@ blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) ->
_ = reply_caller(Id, ?REPLY(From, Request, false, Error)), _ = reply_caller(Id, ?REPLY(From, Request, false, Error)),
{keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}};
blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) ->
ReplayFun = maps:get(async_reply_fun, Opts, undefined), ReplyFun = maps:get(async_reply_fun, Opts, undefined),
Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"),
_ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)), _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)),
{keep_state, St#{ {keep_state, St#{
queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))]) queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))])
}}. }}.
terminate(_Reason, #{id := Id, index := Index}) -> terminate(_Reason, #{id := Id, index := Index}) ->

View File

@ -87,7 +87,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
on_query_async( on_query_async(
InstId, InstId,
{send_message, Data}, {send_message, Data},
{ReplayFun, Args}, {ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client} _State = #{write_syntax := SyntaxLines, client := Client}
) -> ) ->
case data_to_points(Data, SyntaxLines) of case data_to_points(Data, SyntaxLines) of
@ -96,7 +96,7 @@ on_query_async(
influxdb_connector_send_query, influxdb_connector_send_query,
#{points => Points, batch => false, mode => async} #{points => Points, batch => false, mode => async}
), ),
do_async_query(InstId, Client, Points, {ReplayFun, Args}); do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, ErrorPoints} = Err -> {error, ErrorPoints} = Err ->
?tp( ?tp(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
@ -109,7 +109,7 @@ on_query_async(
on_batch_query_async( on_batch_query_async(
InstId, InstId,
BatchData, BatchData,
{ReplayFun, Args}, {ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client} #{write_syntax := SyntaxLines, client := Client}
) -> ) ->
case parse_batch_data(InstId, BatchData, SyntaxLines) of case parse_batch_data(InstId, BatchData, SyntaxLines) of
@ -118,7 +118,7 @@ on_batch_query_async(
influxdb_connector_send_query, influxdb_connector_send_query,
#{points => Points, batch => true, mode => async} #{points => Points, batch => true, mode => async}
), ),
do_async_query(InstId, Client, Points, {ReplayFun, Args}); do_async_query(InstId, Client, Points, {ReplyFun, Args});
{error, Reason} -> {error, Reason} ->
?tp( ?tp(
influxdb_connector_send_query_error, influxdb_connector_send_query_error,
@ -336,13 +336,13 @@ do_query(InstId, Client, Points) ->
Err Err
end. end.
do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> do_async_query(InstId, Client, Points, ReplyFunAndArgs) ->
?SLOG(info, #{ ?SLOG(info, #{
msg => "influxdb write point async", msg => "influxdb write point async",
connector => InstId, connector => InstId,
points => Points points => Points
}), }),
ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). ok = influxdb:write_async(Client, Points, ReplyFunAndArgs).
%% ------------------------------------------------------------------------------------------------- %% -------------------------------------------------------------------------------------------------
%% Tags & Fields Config Trans %% Tags & Fields Config Trans