diff --git a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl index 29f7d31ef..5e1374eca 100644 --- a/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl +++ b/apps/emqx_bridge_mqtt/src/emqx_bridge_worker.erl @@ -377,9 +377,14 @@ common(_StateName, {call, From}, {ensure_present, What, Topic}, State) -> common(_StateName, {call, From}, {ensure_absent, What, Topic}, State) -> {Result, NewState} = ensure_absent(What, Topic, State), {keep_state, NewState, [{reply, From, Result}]}; -common(_StateName, info, {deliver, _, Msg}, #{replayq := Q, if_record_metrics := IfRecordMetric} = State) -> - bridges_metrics_inc(IfRecordMetric, 'bridge.mqtt.message_received'), - NewQ = replayq:append(Q, collect([Msg])), +common(_StateName, info, {deliver, _, Msg}, + State = #{replayq := Q, if_record_metrics := IfRecordMetric}) -> + Msgs = collect([Msg]), + bridges_metrics_inc(IfRecordMetric, + 'bridge.mqtt.message_received', + length(Msgs) + ), + NewQ = replayq:append(Q, Msgs), {keep_state, State#{replayq => NewQ}, {next_event, internal, maybe_send}}; common(_StateName, info, {'EXIT', _, _}, State) -> {keep_state, State}; @@ -586,3 +591,8 @@ bridges_metrics_inc(true, Metric) -> emqx_metrics:inc(Metric); bridges_metrics_inc(_IsRecordMetric, _Metric) -> ok. + +bridges_metrics_inc(true, Metric, Value) -> + emqx_metrics:inc(Metric, Value); +bridges_metrics_inc(_IsRecordMetric, _Metric, _Value) -> + ok.