Commit Graph

94 Commits

Author SHA1 Message Date
Thales Macedo Garitezi b4a5c141ad fix(actions): use action query mode instead of connector's query mode 2023-12-11 14:10:51 -03:00
Thales Macedo Garitezi 2495f59c91 fix(actions): increment rule statistics even if channel is not installed
Fixes new bug posted after https://emqx.atlassian.net/browse/EMQX-11494 was already fixed.

Also reduces the usage of error throwing for flow control a bit.
2023-12-11 14:01:30 -03:00
Zaiming (Stone) Shi 423b586c56 fix(dialyzer): fix some dialyzer issues found on otp 26 2023-12-06 20:32:49 +01:00
Thales Macedo Garitezi dc5e3b939c refactor(resource_manager): use macros and better differentiate status from state
Internally in `emqx_resource_manager`, there seems to be many points where the
`gen_statem` states are conflated with resource status, since their names coincide.  While
that works for now, introducing a new `gen_statem` state, an internal state, shouldn't
necessarily imply a new, externally facing resource status.

Here we also introduce the usage of some macros to avoid the pitfalls of making a typo in
a state/status name.
2023-12-01 18:23:05 -03:00
Thales Macedo Garitezi 0388e1c1c4 fix(kafka_producer): add `resource_opts` to connector schema, and check for client connectivity
Fixes https://emqx.atlassian.net/browse/EMQX-11494
2023-11-30 17:51:32 -03:00
Thales Macedo Garitezi 2b8cf50a1d chore: rename `bridges_v2` -> `actions` in the public facing APIs
Fixes https://emqx.atlassian.net/browse/EMQX-11330

After feedback from Product team, we should rename `bridges_v2` to `actions` everywhere.
We'll start with the public facing APIs.

- HTTP API
- Hocon schema root key
2023-11-06 15:37:07 -03:00
Kjell Winblad 95f3b94ac3 fix(bridge_v2): channels should not be removed when status is connecting
This fixes so that channels are not removed from the resource state when
their status is connecting. This is needed for Kafka since Kafka's message
buffer is stored in the resource state.

Fixes:
https://emqx.atlassian.net/browse/EMQX-11270
2023-11-01 15:27:53 +01:00
Kjell Winblad 9dc3a169b3 feat: split bridges into a connector part and a bridge part
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
Co-authored-by: Stefan Strigler <stefan.strigler@emqx.io>
Co-authored-by: Zaiming (Stone) Shi <zmstone@gmail.com>

Several bridges should be able to share a connector pool defined by a
single connector. The connectors should be possible to enable and
disable similar to how one can disable and enable bridges. There should
also be an API for checking the status of a connector and for
add/edit/delete connectors similar to the current bridge API.

Issues:
https://emqx.atlassian.net/browse/EMQX-10805
2023-10-30 14:48:47 +01:00
Thales Macedo Garitezi eebfb44f72 fix(resource): create `simple_async_internal_buffer` query mode for bridges with internal buffering
Since authn/authz backends also use simple async/sync queries, we may want to avoid them
calling the connector when it's not connected.
2023-10-09 15:02:25 -03:00
Thales Macedo Garitezi 79cf0a2ced fix(kafka_producer): correctly handle metrics for connector that have internal buffers
Fixes https://emqx.atlassian.net/browse/EMQX-11086

There’s currently a metric inconsistency due to the internal buffering nature of Kafka
Producer (wolff).

We use simple_sync_query to call the Kafka Producer bridge.  If that times out, the call
is accounted as failed, even though the message is buffered in wolff and later sent
successfully.
2023-10-09 15:02:25 -03:00
Thales Macedo Garitezi 34186fcc74 fix(kafka_producer): send messages to wolff producer to buffer even when connector is in `connecting` state
Fixes https://emqx.atlassian.net/browse/EMQX-11085

Messages would not be sent to wolff if the connection was down, so they were effectively lost.
2023-10-06 11:43:29 -03:00
Zaiming (Stone) Shi c2d750aa09 fix(resource): redact query args in exception log 2023-09-29 09:20:42 +02:00
zhongwencool 2f1fa2e961 chore: unified slog message formatting to improve logging consistency 2023-09-20 18:13:00 +08:00
Thales Macedo Garitezi 3b1e436d3f refactor: use `emqx_pool:async_submit` to avoid excessive spawning 2023-07-20 10:17:43 -03:00
Thales Macedo Garitezi eb41b77de4 fix(rule_metrics): notify rule metrics of late replies and expired requests
Fixes https://emqx.atlassian.net/browse/EMQX-10600
2023-07-19 11:39:28 -03:00
Stefan Strigler 321fd53132 fix: use ReplyTo in QUERY for async 2023-06-29 16:09:45 +02:00
Stefan Strigler 40dd34a704 fix: use reply_to instead of async_reply_fun 2023-06-29 16:09:45 +02:00
Stefan Strigler 1363108678 fix: fix simple_sync_query 2023-06-29 16:09:45 +02:00
Stefan Strigler 2274a192cc fix(emqx_resource): call async reply fun in simple_aysnc_query 2023-06-29 16:09:45 +02:00
Stefan Strigler ae636a52d7 fix(emqx_rule_engine): set inc_action_metrics as async_reply_fun 2023-06-29 16:09:45 +02:00
Paulo Zulato 62d3766726
Merge pull request #10645 from paulozulato/data-bridge-target-unavailable
Data bridge target unavailable
2023-06-21 18:19:23 -03:00
Thales Macedo Garitezi decfd6df2b feat(buffer_worker): log expired message count
Fixes https://emqx.atlassian.net/browse/EMQX-10165

```
iex(emqx@127.0.0.1)38> 2023-06-21T11:09:35.569404-03:00 [info] msg: buffer_worker_dropped_expired_messages, mfa: emqx_resource_buffer_worker:log_expired_messge_count/1, line: 982, expired_count: 900, resource_id: <<"bridge:webhook:webhook">>, worker_index: 3
```
2023-06-21 14:38:51 -03:00
Paulo Zulato 9454af9a8b feat(postgresql): check whether target table exists
Fixes https://emqx.atlassian.net/browse/EMQX-9026
2023-06-19 11:12:10 -03:00
Thales Macedo Garitezi 99796224d8 refactor(resource): rename `request_timeout` -> `request_ttl`
See
https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options
2023-06-01 13:01:53 -03:00
Thales Macedo Garitezi 8c565abc84 test(cassandra): fix flaky test 2023-05-30 15:42:53 -03:00
Thales Macedo Garitezi 6be8ff378e fix(buffer_worker): make buffer worker enter `blocked` state when async worker dies
Fixes https://emqx.atlassian.net/browse/EMQX-10074

Otherwise, requests from those async workers, now retriable, might not
be retried until the buffer worker blocks for other reasons, which
might take a long time.
2023-05-30 15:34:22 -03:00
Thales Macedo Garitezi db60dcbada test(buffer_worker): add assertion for inflight count after batch expiration
Fixes https://emqx.atlassian.net/browse/EMQX-9829
2023-05-25 16:11:37 -03:00
Thales Macedo Garitezi 0559d6f639 refactor(buffer_worker): use static fn for bumping counters 2023-05-22 09:12:08 -03:00
Thales Macedo Garitezi c74c93388e refactor: rename some variables and sum type constructors for clarity 2023-05-22 09:11:23 -03:00
Thales Macedo Garitezi 7d798c10e9 perf(buffer_worker): flush metrics periodically inside buffer worker process
Fixes https://emqx.atlassian.net/browse/EMQX-9905

Since calling `telemetry` is costly in a hot path, we instead collect
metrics inside the buffer workers state and periodically flush them,
rather than immediately as events happen.
2023-05-22 09:11:23 -03:00
Thales Macedo Garitezi 85089a3210 fix(buffer_worker): correctly flush the buffer workers when inflight table room is made
The previous commit uncovered another bug that was hidden by it:
`maybe_flush_after_async_reply` was sending a message to the wrong
PID.  It was sending a message to `self()` meaning to target a buffer
worker, but `self()` in that context is never the buffer worker, it's
the connector's worker.

This change also revealed a race condition where the buffer workers
could stop flushing messages.  So we piggy-backed on the atomic update
of the table size count to check if the buffer worker should be poked
to continue flushing.  This allows us to get rid of
`maybe_flush_after_async_reply` altogether.
2023-05-16 17:15:42 -03:00
Thales Macedo Garitezi 657df05ad9 fix(buffer_worker): avoid setting flush timer when inflight is full
Fixes https://emqx.atlassian.net/browse/EMQX-9902

When the buffer worker inflight window is full, we don’t need to set a
timer to flush the messages again because there’s no more room, and
one of the inflight windows will flush the buffer worker by calling
`flush_worker`.

Currently, we do set the timer on such situation, and this fact
combined with the default batch time of 0 yields a busy loop situation
where the CPU spins a lot while inflight messages do not return.
2023-05-16 11:28:58 -03:00
Zaiming (Stone) Shi 13dcb5732f Merge remote-tracking branch 'origin/release-50' into 0508-prepare-for-e5.0.4 2023-05-08 21:29:35 +02:00
Thales Macedo Garitezi eba627b365 fix(buffer_worker): fix inflight count when updating inflight item 2023-05-08 09:27:51 -03:00
Zhongwen Deng 4f396a36a9 Merge remote-tracking branch 'upstream/master' into release-50 2023-05-08 14:58:03 +08:00
Thales Macedo Garitezi 8aa7c014e7 perf(buffer_worker): avoid calling `ets:info/2`
(Almost?) fixes https://emqx.atlassian.net/browse/EMQX-9637

During the course of performance tests comparing the performance of
e5.0.3 and e4.4.16 regarding the webhook bridge in sync mode, we
observed that the throughput in e5.0.3 (sync) was much lower than in
e4.4.16: ~ 9 k msgs / s vs. ~ 50 k msgs / s, respectively.

Analyzing `observer_cli` output, we noticed that a lot of the time
both buffer workers and ehttpc processes was spent in `ets:info/2`.
That function was called to check the size of the inflight table when
updating metrics and checking if the inflight table was full.  Other
uses of `ets:info/2` were contained inside the arguments to some
`?tp/2` macro usages (https://github.com/kafka4beam/snabbkaffe/pull/60).

By using a specific record to track the size of the table, we managed
to improve the bridge performance to ~ 45 k msgs / s in sync mode.
2023-05-02 17:05:32 -03:00
Thales Macedo Garitezi c53741a08c fix(buffer_worker): avoid sending late reply messages to callers
Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
2023-04-26 13:18:28 -03:00
Thales Macedo Garitezi d78312e10e test(resource): fix flaky test 2023-04-26 09:25:33 -03:00
Thales Macedo Garitezi cb995e2033 fix(buffer_worker): avoid sending late reply messages to callers
Fixes https://emqx.atlassian.net/browse/EMQX-9635

During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow.  In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`.  However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
2023-04-19 18:27:10 -03:00
Stefan Strigler 7df0493312
Merge pull request #10390 from sstrigler/EMQX-9549-new-emqx-utils-app-to-collect-utility-modules
New emqx_utils app to collect utility modules
2023-04-14 20:33:11 +02:00
Thales Macedo Garitezi e073bc90bc refactor(buffer_worker): rename `s/queue/buffer/g` 2023-04-14 11:37:19 -03:00
Thales Macedo Garitezi 14ed4a7ada feat(buffer_worker): set default queue mode to `memory_only`
Fixes https://emqx.atlassian.net/browse/EMQX-9367

For better user experience and performance for the average bridge, we
should change the default queue mode to `memory_only`, as was the
behavior of most bridges in e4.x.  This leads to better performance
when message rate is high enough and the remote resource is not
keeping up with EMQX.

Also, we set the default segment size to equal max queue bytes.
2023-04-14 11:37:19 -03:00
Stefan Strigler 9c11bfce80 refactor: rename emqx_misc to emqx_utils 2023-04-14 13:41:27 +02:00
Kjell Winblad 8e0d315b7b
Merge pull request #10197 from kjellwinblad/0321-fix-inflight-window-hand-over-to-kjell
fix: add inflight window setting to the clickhouse bridge
2023-03-29 09:38:24 +02:00
Thales Macedo Garitezi ff272a2071
Merge pull request #10206 from thalesmg/decouple-buffer-worker-query-call-mode-v50
feat(buffer_worker): decouple query mode from underlying connector call mode
2023-03-24 13:49:00 -03:00
Thales Macedo Garitezi f8d5d53908 feat(buffer_worker): decouple query mode from underlying connector call mode
Fixes https://emqx.atlassian.net/browse/EMQX-9129

Currently, if an user configures a bridge with query mode sync, then
all calls to the underlying driver/connector ("inner calls") will
always be synchronous, regardless of its support for async calls.

Since buffer workers always support async queries ("outer calls"), we
should decouple those two call modes (inner and outer), and avoid
exposing the inner call configuration to user to avoid complexity.

There are two situations when we want to force synchronous calls to
the underlying connector even if it supports async:

1) When using `simple_sync_query`, since we are bypassing the buffer
workers;
2) When retrying the inflight window, to avoid overwhelming the
driver.
2023-03-23 13:40:31 -03:00
Kjell Winblad 35474578ca refactor: rename async_inflight_window to inflight_window everywhere 2023-03-23 14:21:57 +01:00
Thales Macedo Garitezi 61cb03b45a fix(buffer_worker): change the default `resume_interval` value and expose it as hidden config
Also removes the previously added alarm for request timeout.

There are situations where having a short request timeout and a long
health check interval make sense, so we don't want to alarm the user
for those situations.  Instead, we automatically attempt to set a
reasonable `resume_interval` value.
2023-03-22 11:47:36 -03:00
Thales Macedo Garitezi 20414d7373 fix(buffer_worker): check request timeout and health check interval
Fixes https://emqx.atlassian.net/browse/EMQX-9099

The default value for `request_timeout` is 15 seconds, and the default
resume interval is also 15 seconds (the health check timeout, if
`resume_interval` is not explicitly given).  This means that, in
practice, if a buffer worker ever gets into the blocked state, then
almost all requests will timeout.

Proposed improvement:

- `request_timeout` should by default be twice as much as
health_check_interval.
- Emit a alarm if `request_timeout` is not greater than
`health_check_interval`.
2023-03-16 13:46:45 -03:00
Andrew Mayorov a9bc8a4464
refactor(resman): rename `ets_lookup` → `lookup_cached`
That way we hide the impementation details + the interface becomes
cleaner and more obvious.
2023-03-15 19:17:30 +03:00