MongoDB connector currently does not support batching
so the batch_size option has no effect.
However we cannot remove the field, so we choose to hide it from
schema
This commit refactor the query_mode resource detection code according to
a suggestion from @zmstone. This commit should not contain any
functional change except for a change of the Kafka producer bridge
config.
This commit makes it possible to configure if a Kafka bridge should work
in query mode sync or async by setting the resource_opts.query_mode
configuration option.
Fixes:
https://emqx.atlassian.net/browse/EMQX-8631
See:
https://emqx.atlassian.net/wiki/spaces/P/pages/612368639/open+e5.1+remove+auto+restart+interval+from+buffer+worker+resource+options
Current problem:
In 5.0.x, we have two timer options that control the state changing of buffer worker
resources: auto_restart_interval and health_check_interval.
- auto_restart_interval controls how often the resource attempts to transition from
disconnected to connected.
- health_check_interval controls how often the resource is checked and potentially moved
from connected to disconnected or connecting.
The existence of two independent timers for very similar purposes is confusing to users,
QA and even developers. Also, an intimately related configuration is request_timeout,
which can interact badly with auto_restart_interval if the latter is poorly configured:
requests may always expire if request_timeout < auto_restart_interval and if the resource
enters the disconnected state. For health_check_interval, we attempt to derive a sane
default that gives requests a chance to retry (if request timeout is finite, then the
resource retries requests with a period of min(health_check_interval, request_timeout /
3).
Another problem with the separate auto_restart_interval is that its default value (60 s)
is too high when compared to the default request timeout and health check, leading to the
problems described above if not tuned.
Proposed solution:
We propose to drop auto_restart_interval in favor of health_check_interval, which will be
used for both disconnected -> connected and connected -> {disconnected, connecting}
transition checks. With that, the resource will attempt to reconnect at the same interval
as the health check, which currently is 15 s.
Also, as two smaller changes to accompany this one:
- Increase the default request_timeout from 15 s to 45 s.
- Rename request_timeout to request_ttl.
Fixes https://emqx.atlassian.net/browse/EMQX-10074
Otherwise, requests from those async workers, now retriable, might not
be retried until the buffer worker blocks for other reasons, which
might take a long time.
Using `simple_one_for_one` has a potential race condition issue where
we read the PID of the resource manager before trying to remove a
resource, and then that PID changes because it was either dead at
first, or it crashed and changed, and later we use this stale PID to
try to remove it from the supervisor. Under such circumstances, the
restarting child might linger in the supervisor, leaking resources.
By using the resource ID itself as a child ID (and using `one_for_one`
restart strategy), we ensure the child is truly removed.
Depending on timing, `t_write_timeout` was getting stuck while
checking the resource health, and the previous request timeout options
were making a response to never be sent if that process took too long.
Fixes https://emqx.atlassian.net/browse/EMQX-9905
Since calling `telemetry` is costly in a hot path, we instead collect
metrics inside the buffer workers state and periodically flush them,
rather than immediately as events happen.
Otherwise, depending on the test execution order, tests might
sometimes fail.
Moreover, ensure that applications describe their dependecies
correctly and avoid starting irrelevant apps in tests.
Update is implemented as remove + create.
If a dleete call is made while the create is in progress
the remove call is likely to timeout too.
This causes the follwing creation to falsely succeed,
because there is alreay a running child under the supervisor.
As a result, the resource is permanently removed after
resource_manager eventually handles the remove call.
the timeout shutdown in child spec may
significantly slow down the deletion of a resource
this commit chagnes the shutdown to brutal kill
also, the pool worker removal code has been delete
because it's not necessary since the entier pool is
going to be force-delete later anyway
Fixes https://emqx.atlassian.net/browse/EMQX-9864
Setting a very large interval can cause `erlang:start_timer` to crash.
Also, setting auto_restart_interval or health_check_interval to "0s"
causes the state machine to be in loop as time 0 is handled separately:
| state_timeout() = timeout() | integer()
| (...)
| If Time is relative and 0 no timer is actually started, instead the the
| time-out event is enqueued to ensure that it gets processed before any
| not yet received external event.
from "https://www.erlang.org/doc/man/gen_statem.html#type-state_timeout"
Therefore, both fields are now validated against the range [1ms, 1h],
which doesn't cause above issues.
The previous commit uncovered another bug that was hidden by it:
`maybe_flush_after_async_reply` was sending a message to the wrong
PID. It was sending a message to `self()` meaning to target a buffer
worker, but `self()` in that context is never the buffer worker, it's
the connector's worker.
This change also revealed a race condition where the buffer workers
could stop flushing messages. So we piggy-backed on the atomic update
of the table size count to check if the buffer worker should be poked
to continue flushing. This allows us to get rid of
`maybe_flush_after_async_reply` altogether.
Fixes https://emqx.atlassian.net/browse/EMQX-9902
When the buffer worker inflight window is full, we don’t need to set a
timer to flush the messages again because there’s no more room, and
one of the inflight windows will flush the buffer worker by calling
`flush_worker`.
Currently, we do set the timer on such situation, and this fact
combined with the default batch time of 0 yields a busy loop situation
where the CPU spins a lot while inflight messages do not return.
Some performance tests indicate that calling `telemetry` is costly in
hot paths. Since increasing a counter by 0 is a no-op, we should
avoid calling `telemetry` if the amount to increase is 0.
(Almost?) fixes https://emqx.atlassian.net/browse/EMQX-9637
During the course of performance tests comparing the performance of
e5.0.3 and e4.4.16 regarding the webhook bridge in sync mode, we
observed that the throughput in e5.0.3 (sync) was much lower than in
e4.4.16: ~ 9 k msgs / s vs. ~ 50 k msgs / s, respectively.
Analyzing `observer_cli` output, we noticed that a lot of the time
both buffer workers and ehttpc processes was spent in `ets:info/2`.
That function was called to check the size of the inflight table when
updating metrics and checking if the inflight table was full. Other
uses of `ets:info/2` were contained inside the arguments to some
`?tp/2` macro usages (https://github.com/kafka4beam/snabbkaffe/pull/60).
By using a specific record to track the size of the table, we managed
to improve the bridge performance to ~ 45 k msgs / s in sync mode.
Also use it instead of a custom ETS table for simplicity and
better consistency. This has drawbacks though: expect slightly
increased load on gproc gen_server due to how `gproc:set_value/2`
works.
Before this change, a separate `manager_id` / `instance_id` was used
as resource manager id, which made connector interface somewhat
inconsistent: part of function calls to connector implementation
used instance id as first argument while the rest used resource id
itself.
Fixes https://emqx.atlassian.net/browse/EMQX-9635
During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow. In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`. However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.
Fixes https://emqx.atlassian.net/browse/EMQX-9635
During a sync call from process `A` to a buffer worker `B`, its call
to the underlying resource `C` can be very slow. In those cases, `A`
will receive a timeout response and expect no more messages from `B`
nor `C`. However, prior to this fix, if `B` is stuck in a long sync
call to `C` and then gets its response after `A` timed out, `B` would
still send the late response to `A`, polluting its mailbox.