From 67ec6e0e66fe7e41f2176b3ae8a5c967fe10d44e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 16 Aug 2022 10:24:05 +0800 Subject: [PATCH 1/5] fix: log RuleId for take action failed --- .../emqx_rule_engine/include/rule_actions.hrl | 19 ++++++++ .../src/emqx_rule_actions.erl | 18 +++++--- .../src/emqx_rule_runtime.erl | 45 +++++++++---------- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 34 ++++++++++++++ .../src/emqx_web_hook_actions.erl | 10 +++-- 5 files changed, 93 insertions(+), 33 deletions(-) diff --git a/apps/emqx_rule_engine/include/rule_actions.hrl b/apps/emqx_rule_engine/include/rule_actions.hrl index e432c4399..4571849d3 100644 --- a/apps/emqx_rule_engine/include/rule_actions.hrl +++ b/apps/emqx_rule_engine/include/rule_actions.hrl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -compile({parse_transform, emqx_rule_actions_trans}). -type selected_data() :: map(). @@ -6,6 +22,9 @@ -define(BINDING_KEYS, '__bindings__'). +-define(LOG_RULE_ACTION(Level, Metadata, Fmt, Args), + emqx_rule_utils:log_action(Level, Metadata, Fmt, Args)). + -define(bound_v(Key, ENVS0), maps:get(Key, maps:get(?BINDING_KEYS, ENVS0, #{}))). diff --git a/apps/emqx_rule_engine/src/emqx_rule_actions.erl b/apps/emqx_rule_engine/src/emqx_rule_actions.erl index d665a0c96..557ddf423 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_actions.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_actions.erl @@ -171,9 +171,13 @@ on_action_create_republish(Id, Params = #{ on_action_republish(_Selected, Envs = #{ topic := Topic, headers := #{republish_by := ActId}, - ?BINDING_KEYS := #{'Id' := ActId} + ?BINDING_KEYS := #{'Id' := ActId}, + metadata := Metadata }) -> - ?LOG(error, "[republish] recursively republish detected, msg topic: ~p, target topic: ~p", + ?LOG_RULE_ACTION( + error, + Metadata, + "[republish] recursively republish detected, msg topic: ~p, target topic: ~p", [Topic, ?bound_v('TargetTopic', Envs)]), emqx_rule_metrics:inc_actions_error(?bound_v('Id', Envs)), {badact, recursively_republish}; @@ -186,8 +190,9 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - } = Bindings}) -> - ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + } = Bindings, + metadata := Metadata}) -> + ?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), Message = #message{ @@ -210,8 +215,9 @@ on_action_republish(Selected, _Envs = #{ 'TargetQoS' := TargetQoS, 'TopicTks' := TopicTks, 'PayloadTks' := PayloadTks - } = Bindings}) -> - ?LOG(debug, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), + } = Bindings, + metadata := Metadata}) -> + ?LOG_RULE_ACTION(debug, Metadata, "[republish] republish to: ~p, Selected: ~p", [TargetTopic, Selected]), TargetRetain = maps:get('TargetRetain', Bindings, false), Message = #message{ diff --git a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl index 88b2c9143..22fac23c0 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_runtime.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_runtime.erl @@ -17,6 +17,7 @@ -module(emqx_rule_runtime). -include("rule_engine.hrl"). +-include("rule_actions.hrl"). -include_lib("emqx/include/emqx.hrl"). -include_lib("emqx/include/logger.hrl"). @@ -54,36 +55,37 @@ apply_rules([Rule|More], Input) -> apply_rule(Rule, Input), apply_rules(More, Input). -apply_rule(Rule = #rule{id = RuleID}, Input) -> +apply_rule(Rule = #rule{id = RuleId}, Input) -> clear_rule_payload(), - ok = emqx_rule_metrics:inc_rules_matched(RuleID), - try do_apply_rule(Rule, add_metadata(Input, #{rule_id => RuleID})) + ok = emqx_rule_metrics:inc_rules_matched(RuleId), + %% Add metadata here caused we need support `metadata` and `rule_id` in SQL + try do_apply_rule(Rule, emqx_rule_utils:add_metadata(Input, #{rule_id => RuleId})) catch %% ignore the errors if select or match failed _:Reason = {select_and_transform_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "SELECT clause exception for ~s failed: ~p", - [RuleID, Error]), + [RuleId, Error]), {error, Reason}; _:Reason = {match_conditions_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "WHERE clause exception for ~s failed: ~p", - [RuleID, Error]), + [RuleId, Error]), {error, Reason}; _:Reason = {select_and_collect_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "FOREACH clause exception for ~s failed: ~p", - [RuleID, Error]), + [RuleId, Error]), {error, Reason}; _:Reason = {match_incase_error, Error} -> - emqx_rule_metrics:inc_rules_exception(RuleID), + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(warning, "INCASE clause exception for ~s failed: ~p", - [RuleID, Error]), + [RuleId, Error]), {error, Reason}; _:Error:StkTrace -> - emqx_rule_metrics:inc_rules_exception(RuleID), + emqx_rule_metrics:inc_rules_exception(RuleId), ?LOG(error, "Apply rule ~s failed: ~p. Stacktrace:~n~p", - [RuleID, Error, StkTrace]), + [RuleId, Error, StkTrace]), {error, {Error, StkTrace}} end. @@ -245,9 +247,10 @@ number(Bin) -> catch error:badarg -> binary_to_float(Bin) end. -%% Step3 -> Take actions +%% %% Step3 -> Take actions +%% fallback actions already have `rule_id` in `metadata` take_actions(Actions, Selected, Envs, OnFailed) -> - [take_action(ActInst, Selected, Envs, OnFailed, ?ActionMaxRetry) + [take_action(ActInst, Selected, emqx_rule_utils:add_metadata(Envs, ActInst), OnFailed, ?ActionMaxRetry) || ActInst <- Actions]. take_action(#action_instance{id = Id, name = ActName, fallbacks = Fallbacks} = ActInst, @@ -312,12 +315,12 @@ wait_action_on(Id, RetryN) -> end end. -handle_action_failure(continue, Id, Fallbacks, Selected, Envs, Reason) -> - ?LOG(error, "Take action ~p failed, continue next action, reason: ~0p", [Id, Reason]), +handle_action_failure(continue, _Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) -> + ?LOG_RULE_ACTION(error, Metadata, "Continue next action, reason: ~0p", [Reason]), _ = take_actions(Fallbacks, Selected, Envs, continue), failed; -handle_action_failure(stop, Id, Fallbacks, Selected, Envs, Reason) -> - ?LOG(error, "Take action ~p failed, skip all actions, reason: ~0p", [Id, Reason]), +handle_action_failure(stop, Id, Fallbacks, Selected, Envs = #{metadata := Metadata}, Reason) -> + ?LOG_RULE_ACTION(error, Metadata, "Skip all actions, reason: ~0p", [Reason]), _ = take_actions(Fallbacks, Selected, Envs, continue), error({take_action_failed, {Id, Reason}}). @@ -429,10 +432,6 @@ do_apply_func(Name, Args, Input) -> Result -> Result end. -add_metadata(Input, Metadata) when is_map(Input), is_map(Metadata) -> - NewMetadata = maps:merge(maps:get(metadata, Input, #{}), Metadata), - Input#{metadata => NewMetadata}. - %%------------------------------------------------------------------------------ %% Internal Functions %%------------------------------------------------------------------------------ diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 137e22128..8bd46a958 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -16,6 +16,9 @@ -module(emqx_rule_utils). +-include("rule_engine.hrl"). +-include_lib("emqx/include/logger.hrl"). + -export([ replace_var/2 ]). @@ -59,6 +62,10 @@ , can_topic_match_oneof/2 ]). +-export([ add_metadata/2 + , log_action/4 + ]). + -compile({no_auto_import, [ float/1 ]}). @@ -371,3 +378,30 @@ can_topic_match_oneof(Topic, Filters) -> lists:any(fun(Fltr) -> emqx_topic:match(Topic, Fltr) end, Filters). + +add_metadata(Envs, Metadata) when is_map(Envs), is_map(Metadata) -> + NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata), + Envs#{metadata => NMetadata}; +add_metadata(Envs, Action) when is_map(Envs), is_record(Action, action_instance)-> + Metadata = gen_metadata_from_action(Action), + NMetadata = maps:merge(maps:get(metadata, Envs, #{}), Metadata), + Envs#{metadata => NMetadata}. + +gen_metadata_from_action(#action_instance{name = Name, args = undefined}) -> + #{action_name => Name, resource_id => undefined}; +gen_metadata_from_action(#action_instance{name = Name, args = Args}) + when is_map(Args) -> + #{action_name => Name, resource_id => maps:get(<<"$resource">>, Args, undefined)}; +gen_metadata_from_action(#action_instance{name = Name}) -> + #{action_name => Name, resource_id => undefined}. + +log_action(Level, Metadata, Fmt, Args) -> + ?LOG(Level, + "Rule: ~p; Action: ~p; Rusource: ~p. " ++ Fmt, + metadata_values(Metadata) ++ Args). + +metadata_values(Metadata) -> + RuleId = maps:get(rule_id, Metadata, undefined), + ActionName = maps:get(action_name, Metadata, undefined), + ResourceName = maps:get(resource_id, Metadata, undefined), + [RuleId, ActionName, ResourceName]. diff --git a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl index 00094d46d..29550e1e9 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook_actions.erl +++ b/apps/emqx_web_hook/src/emqx_web_hook_actions.erl @@ -259,25 +259,27 @@ on_action_data_to_webserver(Selected, _Envs = 'BodyTokens' := BodyTokens, 'Pool' := Pool, 'RequestTimeout' := RequestTimeout}, - clientid := ClientID}) -> + clientid := ClientID, + metadata := Metadata}) -> NBody = format_msg(BodyTokens, clear_user_property_header(Selected)), NPath = emqx_rule_utils:proc_tmpl(PathTokens, Selected), Req = create_req(Method, NPath, Headers, NBody), case ehttpc:request({Pool, ClientID}, Method, Req, RequestTimeout) of {ok, StatusCode, _} when StatusCode >= 200 andalso StatusCode < 300 -> + ?LOG_RULE_ACTION(debug, Metadata, "HTTP Request succeeded with path: ~p status code ~p", [NPath, StatusCode]), emqx_rule_metrics:inc_actions_success(Id); {ok, StatusCode, _, _} when StatusCode >= 200 andalso StatusCode < 300 -> emqx_rule_metrics:inc_actions_success(Id); {ok, StatusCode, _} -> - ?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), + ?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), emqx_rule_metrics:inc_actions_error(Id), {badact, StatusCode}; {ok, StatusCode, _, _} -> - ?LOG(warning, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), + ?LOG_RULE_ACTION(warning, Metadata, "HTTP request failed with path: ~p status code: ~p", [NPath, StatusCode]), emqx_rule_metrics:inc_actions_error(Id), {badact, StatusCode}; {error, Reason} -> - ?LOG(error, "HTTP request failed path: ~p error: ~p", [NPath, Reason]), + ?LOG_RULE_ACTION(error, Metadata, "HTTP request failed path: ~p error: ~p", [NPath, Reason]), emqx_rule_metrics:inc_actions_error(Id), {badact, Reason} end. From 4a89dfe3622060ebfe31365c8b6681e62806693e Mon Sep 17 00:00:00 2001 From: JimMoen Date: Tue, 16 Aug 2022 10:53:39 +0800 Subject: [PATCH 2/5] chore: update CHANGES.md and appup.src This reverts commit 7af25a82e70845a631be0c8b83ba7f1838d68389. --- CHANGES-4.3.md | 13 +++---- .../src/emqx_rule_engine.appup.src | 34 +++++++++++++++---- apps/emqx_web_hook/src/emqx_web_hook.app.src | 2 +- .../emqx_web_hook/src/emqx_web_hook.appup.src | 18 +++------- 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/CHANGES-4.3.md b/CHANGES-4.3.md index ab74b877a..babc8cb17 100644 --- a/CHANGES-4.3.md +++ b/CHANGES-4.3.md @@ -12,22 +12,17 @@ File format: ## v4.3.19 -### Bug fixes -- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655) - -## v4.3.19 - ### Enhancements - Improve error message for LwM2M plugin when object ID is not valid [#8654](https://github.com/emqx/emqx/pull/8654). - Add tzdata apk package to alpine docker image. [#8671](https://github.com/emqx/emqx/pull/8671) -- Add node evacuation and cluster rebalancing features [#8597] - -## v4.3.19 +- Add node evacuation and cluster rebalancing features [#8597](https://github.com/emqx/emqx/pull/8597) +- Refine Rule Engine error log. RuleId will be logged when take action failed. [#8737](https://github.com/emqx/emqx/pull/8737) ### Bug fixes -- Add a idle timer for ExProto UDP client to avoid client leaking [#8628] +- Fix GET `/auth_clientid` and `/auth_username` counts. [#8655](https://github.com/emqx/emqx/pull/8655) +- Add a idle timer for ExProto UDP client to avoid client leaking [#8628](https://github.com/emqx/emqx/pull/8628) ## v4.3.18 diff --git a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src index 1c5fb0e7b..b113b2e22 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src +++ b/apps/emqx_rule_engine/src/emqx_rule_engine.appup.src @@ -1,10 +1,21 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.13",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, - {"4.3.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.13", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.3.12", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", @@ -166,10 +177,21 @@ {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.13",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, - {"4.3.12",[{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + [{"4.3.13", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, + {"4.3.12", + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}]}, {"4.3.11", - [{load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, + [{load_module,emqx_rule_actions,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_utils,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_runtime,brutal_purge,soft_purge,[]}, + {load_module,emqx_rule_registry,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_validator,brutal_purge,soft_purge,[]}, {load_module,emqx_rule_engine_api,brutal_purge,soft_purge,[]}]}, {"4.3.10", diff --git a/apps/emqx_web_hook/src/emqx_web_hook.app.src b/apps/emqx_web_hook/src/emqx_web_hook.app.src index 43e28fe1a..efe41b3bb 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.app.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.app.src @@ -1,6 +1,6 @@ {application, emqx_web_hook, [{description, "EMQ X WebHook Plugin"}, - {vsn, "4.3.13"}, % strict semver, bump manually! + {vsn, "4.3.14"}, % strict semver, bump manually! {modules, []}, {registered, [emqx_web_hook_sup]}, {applications, [kernel,stdlib,ehttpc]}, diff --git a/apps/emqx_web_hook/src/emqx_web_hook.appup.src b/apps/emqx_web_hook/src/emqx_web_hook.appup.src index fff4ec08c..a2f72f0e2 100644 --- a/apps/emqx_web_hook/src/emqx_web_hook.appup.src +++ b/apps/emqx_web_hook/src/emqx_web_hook.appup.src @@ -1,12 +1,7 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{<<"4\\.3\\.[0-2]">>, - [{apply,{application,stop,[emqx_web_hook]}}, - {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, - {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, - {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-7]">>, + [{<<"4\\.3\\.[0-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -26,15 +21,10 @@ {"4.3.11", [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {"4.3.12", + {<<"4\\.3\\.1[2-3]">>, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{<<"4\\.3\\.[0-2]">>, - [{apply,{application,stop,[emqx_web_hook]}}, - {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, - {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, - {load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, - {<<"4\\.3\\.[3-7]">>, + [{<<"4\\.3\\.[0-7]">>, [{apply,{application,stop,[emqx_web_hook]}}, {load_module,emqx_web_hook_app,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}, @@ -54,6 +44,6 @@ {"4.3.11", [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}, {load_module,emqx_web_hook,brutal_purge,soft_purge,[]}]}, - {"4.3.12", + {<<"4\\.3\\.1[2-3]">>, [{load_module,emqx_web_hook_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}]}. From 0502be6055f8841b5d91734110e945051b6314bc Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 17 Aug 2022 15:36:04 +0800 Subject: [PATCH 3/5] chore(typo): fix typo --- apps/emqx_rule_engine/src/emqx_rule_utils.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx_rule_engine/src/emqx_rule_utils.erl b/apps/emqx_rule_engine/src/emqx_rule_utils.erl index 8bd46a958..5c9472367 100644 --- a/apps/emqx_rule_engine/src/emqx_rule_utils.erl +++ b/apps/emqx_rule_engine/src/emqx_rule_utils.erl @@ -397,7 +397,7 @@ gen_metadata_from_action(#action_instance{name = Name}) -> log_action(Level, Metadata, Fmt, Args) -> ?LOG(Level, - "Rule: ~p; Action: ~p; Rusource: ~p. " ++ Fmt, + "Rule: ~p; Action: ~p; Resource: ~p. " ++ Fmt, metadata_values(Metadata) ++ Args). metadata_values(Metadata) -> From 768ab4eacdd60d2edb1e1c6ec1958ec2a91c54fb Mon Sep 17 00:00:00 2001 From: JimMoen Date: Wed, 17 Aug 2022 16:03:42 +0800 Subject: [PATCH 4/5] fix(bridge): mqtt bridge worker status idle --- .../src/emqx_bridge_mqtt.app.src | 2 +- .../src/emqx_bridge_mqtt.appup.src | 4 ++-- .../src/emqx_bridge_mqtt_actions.erl | 23 +++++++++++++++---- 3 files changed, 22 insertions(+), 7 deletions(-) diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src index 31c795ff5..578671f4e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_mqtt, [{description, "EMQ X Bridge to MQTT Broker"}, - {vsn, "4.3.5"}, % strict semver, bump manually! + {vsn, "4.3.6"}, % strict semver, bump manually! {modules, []}, {registered, []}, {applications, [kernel,stdlib,replayq,emqtt]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src index 9e1eda7f4..a72a18658 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt.appup.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- %% Unless you know what you are doing, DO NOT edit manually!! {VSN, - [{"4.3.4", + [{<<"4\\.3\\.[4-5]">>, [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, @@ -14,7 +14,7 @@ {load_module,emqx_bridge_worker,brutal_purge,soft_purge,[]}, {load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {<<".*">>,[]}], - [{"4.3.4", + [{<<"4\\.3\\.[4-5]">>, [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}]}, {"4.3.3", [{load_module,emqx_bridge_mqtt_actions,brutal_purge,soft_purge,[]}, diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl index 941ad51d0..a2b88352e 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_mqtt_actions.erl @@ -433,7 +433,7 @@ test_resource_status(PoolName) -> try Status = [ receive {Pid, R} -> R - after 1000 -> %% get_worker_status/1 should be a quick operation + after 10000 -> %% get_worker_status/1 should be a quick operation throw({timeout, Pid}) end || Pid <- Pids], lists:any(fun(St) -> St =:= true end, Status) @@ -444,13 +444,28 @@ test_resource_status(PoolName) -> false end. +-define(RETRY_TIMES, 4). + get_worker_status(Worker) -> + get_worker_status(Worker, ?RETRY_TIMES). + +get_worker_status(_Worker, 0) -> + false; +get_worker_status(Worker, Times) -> case ecpool_worker:client(Worker) of {ok, Bridge} -> try emqx_bridge_worker:status(Bridge) of - connected -> true; - _ -> false - catch _Error:_Reason -> + connected -> + true; + idle -> + ?LOG(info, "MQTT Bridge get status idle. Should not ignore this."), + timer:sleep(100), + get_worker_status(Worker, Times - 1); + ErrorStatus -> + ?LOG(error, "MQTT Bridge get status ~p", [ErrorStatus]), + false + catch Error:Reason:ST -> + ?LOG(error, "MQTT Bridge get status error: ~p reason: ~p stacktrace: ~p", [Error, Reason, ST]), false end; {error, _} -> From 58e24c2fde0f08bdeadeb23e5533e5a3a2a8cb58 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Aug 2022 16:27:52 +0800 Subject: [PATCH 5/5] chore: bump eredis to 1.2.8 to fix reconnect port leak --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index daddc8b97..799631dc7 100644 --- a/rebar.config +++ b/rebar.config @@ -41,7 +41,7 @@ [ {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {redbug, "2.0.7"} , {ehttpc, {git, "https://github.com/emqx/ehttpc", {tag, "0.2.0"}}} - , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.2"}}} + , {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.3"}}} , {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}} , {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}} , {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.8.2"}}}