This makes the buffer/resource workers always use `replayq` for
queuing, along with collecting multiple requests in a single call.
This is done to avoid long message queues for the buffer workers and
rely on `replayq`'s capabilities of offloading to disk and detecting
overflow.
Also, this deprecates the `enable_batch` and `enable_queue` resource
creation options, as: i) queuing is now always enables; ii) batch_size
> 1 <=> batch_enabled. The corresponding metric
`dropped.queue_not_enabled` is dropped, along with `batching`. The
batching is too ephemeral, especially considering a default batch time
of 20 ms, and is not shown in the dashboard, so it was removed.
To avoid confusion for the users as to what persistence guarantees we
offer when buffering bridges/resources, we will always enable offload
mode for `replayq`. With this, when the buffer size is above the max
segment size, it'll flush the queue to disk, but on recovery after a
restart it'll clean the existing segments rather than resuming from
them.
https://emqx.atlassian.net/browse/EMQX-8548
Currently, we face several issues trying to keep resource metrics
reasonable. For example, when a resource is re-created and has its
metrics reset, but then its durable queue resumes its previous work
and leads to strange (often negative) metrics.
Instead using `counters` that are shared by more than one worker to
manage gauges, we introduce an ETS table whose key is not only scoped
by the Resource ID as before, but also by the worker ID. This way,
when a worker starts/terminates, they should set their own gauges to
their values (often 0 or `replayq:count` when resuming off a queue).
With this scoping and initialization procedure, we'll hopefully avoid
hitting those strange metrics scenarios and have better control over
the gauges.
This commit adds support for counters and gauges to the Kafka Brige.
The Kafka bridge uses [Wolff](https://github.com/kafka4beam/wolff) for
the Kafka connection. Wolff does its own batching and does not use the
batching functionality in `emqx_resource_worker` that is used by other
bridge types. Therefore, the counter events have to be generated by
Wolff. We have added
[telemetry](https://github.com/beam-telemetry/telemetry) events to Wolff
that we hook into to change counters and gauges for the Kafka bridge. The
counter called `matched` does not depend on specific functionality of
any bridge type so the updates of this counter is moved higher up in the
call chain then previously so that it also gets updated for Kafka
bridges.