From 34e9056779e41cf04398c60b990e3b6e52ae28ce Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 24 Nov 2022 15:16:24 -0300 Subject: [PATCH] refactor: fix typo in variable name Might confuse people to think it's related to `replayq`. --- apps/emqx_connector/src/emqx_connector_mqtt.erl | 4 ++-- apps/emqx_resource/src/emqx_resource_worker.erl | 10 +++++----- .../src/emqx_ee_connector_influxdb.erl | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/apps/emqx_connector/src/emqx_connector_mqtt.erl b/apps/emqx_connector/src/emqx_connector_mqtt.erl index b063d7436..438c7b87e 100644 --- a/apps/emqx_connector/src/emqx_connector_mqtt.erl +++ b/apps/emqx_connector/src/emqx_connector_mqtt.erl @@ -192,11 +192,11 @@ on_query(_InstId, {send_message, Msg}, #{name := InstanceId}) -> on_query_async( _InstId, {send_message, Msg}, - {ReplayFun, Args}, + {ReplyFun, Args}, #{name := InstanceId} ) -> ?TRACE("QUERY", "async_send_msg_to_remote_node", #{message => Msg, connector => InstanceId}), - emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplayFun, Args}). + emqx_connector_mqtt_worker:send_to_remote_async(InstanceId, Msg, {ReplyFun, Args}). on_get_status(_InstId, #{name := InstanceId, bridge_conf := Conf}) -> AutoReconn = maps:get(auto_reconnect, Conf, true), diff --git a/apps/emqx_resource/src/emqx_resource_worker.erl b/apps/emqx_resource/src/emqx_resource_worker.erl index a36cb15b7..6722e1b43 100644 --- a/apps/emqx_resource/src/emqx_resource_worker.erl +++ b/apps/emqx_resource/src/emqx_resource_worker.erl @@ -163,8 +163,8 @@ running(cast, {block, [?QUERY(_, _, _) | _] = Batch}, #{id := Id, queue := Q} = running({call, From}, {query, Request, _Opts}, St) -> query_or_acc(From, Request, St); running(cast, {query, Request, Opts}, St) -> - ReplayFun = maps:get(async_reply_fun, Opts, undefined), - query_or_acc(ReplayFun, Request, St); + ReplyFun = maps:get(async_reply_fun, Opts, undefined), + query_or_acc(ReplyFun, Request, St); running(info, {flush, Ref}, St = #{tref := {_TRef, Ref}}) -> flush(St#{tref := undefined}); running(info, {flush, _Ref}, _St) -> @@ -191,11 +191,11 @@ blocked({call, From}, {query, Request, _Opts}, #{id := Id, queue := Q} = St) -> _ = reply_caller(Id, ?REPLY(From, Request, false, Error)), {keep_state, St#{queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(From, Request, false))])}}; blocked(cast, {query, Request, Opts}, #{id := Id, queue := Q} = St) -> - ReplayFun = maps:get(async_reply_fun, Opts, undefined), + ReplyFun = maps:get(async_reply_fun, Opts, undefined), Error = ?RESOURCE_ERROR(blocked, "resource is blocked"), - _ = reply_caller(Id, ?REPLY(ReplayFun, Request, false, Error)), + _ = reply_caller(Id, ?REPLY(ReplyFun, Request, false, Error)), {keep_state, St#{ - queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplayFun, Request, false))]) + queue := maybe_append_queue(Id, Q, [?Q_ITEM(?QUERY(ReplyFun, Request, false))]) }}. terminate(_Reason, #{id := Id, index := Index}) -> diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 36b2ec44d..2c5b8a8fc 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -87,7 +87,7 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client on_query_async( InstId, {send_message, Data}, - {ReplayFun, Args}, + {ReplyFun, Args}, _State = #{write_syntax := SyntaxLines, client := Client} ) -> case data_to_points(Data, SyntaxLines) of @@ -96,7 +96,7 @@ on_query_async( influxdb_connector_send_query, #{points => Points, batch => false, mode => async} ), - do_async_query(InstId, Client, Points, {ReplayFun, Args}); + do_async_query(InstId, Client, Points, {ReplyFun, Args}); {error, ErrorPoints} = Err -> ?tp( influxdb_connector_send_query_error, @@ -109,7 +109,7 @@ on_query_async( on_batch_query_async( InstId, BatchData, - {ReplayFun, Args}, + {ReplyFun, Args}, #{write_syntax := SyntaxLines, client := Client} ) -> case parse_batch_data(InstId, BatchData, SyntaxLines) of @@ -118,7 +118,7 @@ on_batch_query_async( influxdb_connector_send_query, #{points => Points, batch => true, mode => async} ), - do_async_query(InstId, Client, Points, {ReplayFun, Args}); + do_async_query(InstId, Client, Points, {ReplyFun, Args}); {error, Reason} -> ?tp( influxdb_connector_send_query_error, @@ -336,13 +336,13 @@ do_query(InstId, Client, Points) -> Err end. -do_async_query(InstId, Client, Points, ReplayFunAndArgs) -> +do_async_query(InstId, Client, Points, ReplyFunAndArgs) -> ?SLOG(info, #{ msg => "influxdb write point async", connector => InstId, points => Points }), - ok = influxdb:write_async(Client, Points, ReplayFunAndArgs). + ok = influxdb:write_async(Client, Points, ReplyFunAndArgs). %% ------------------------------------------------------------------------------------------------- %% Tags & Fields Config Trans