Commit Graph

217 Commits

Author SHA1 Message Date
Thales Macedo Garitezi ed2be4d086 feat(mongo): expose buffer worker opts to the bridge frontend (5.0) 2023-01-12 15:23:41 -03:00
Shawn 1711823487 refactor: remove default value of timestamp field of influxdb 2023-01-12 22:25:44 +08:00
Thales Macedo Garitezi 9d99d180f9
Merge pull request #9709 from thalesmg/mongodb-bridge-payload-template-v50
feat(mongodb): add `payload_template` field for bridge (e5.0)
2023-01-11 11:14:46 -03:00
Zaiming (Stone) Shi 67f2159a27
Merge pull request #9653 from zmstone/0101-authz-schema-union-member-selection
0101 authz schema union member selection
2023-01-09 22:17:51 +01:00
Thales Macedo Garitezi b9f258b737 feat(mongodb): add `payload_template` field for bridge (e5.0)
https://emqx.atlassian.net/browse/EMQX-8705

Adds a `payload_template` fields that allows users to customize the
payload to publish to MongoDB.
2023-01-09 13:38:11 -03:00
lafirest 20f49b903f
Merge pull request #9702 from lafirest/feat/timescale_matrix
feat(bridges): add timescale && matrix bridges
2023-01-10 00:06:17 +08:00
firest 062f14bd65 test(bridges): add timescale && matrix test cases 2023-01-09 23:04:36 +08:00
Zaiming (Stone) Shi c3635f537a ci: wait for redis in emqx_ee_bridge_redis_SUITE 2023-01-09 14:39:56 +01:00
Zaiming (Stone) Shi 0697c692ed refactor: mongo_type and redis_type are not mandatory
the connector schemas are shared between authn, auth and bridges,
the difference is that authn and authz configs are unions like:

[
    {mongo_type = rs,
     ...
    }
    {another backend config}
]

however the brdige types are maps like, for example:

mongodb_rs.$name {
    mongo_type = rs
    ...
}

in which case, the mongo_type is not required.

in order to keep the schema static as much as possible,
this field is chanegd to 'required => false' with a default value.

However, for authn and authz, the union selector will still raise
exception if the there is no type provided.
2023-01-09 14:26:16 +01:00
Zaiming (Stone) Shi ffb09f0c4d test: rename a test option name to avoid clashing with prod config name 2023-01-09 14:26:16 +01:00
Zaiming (Stone) Shi e52f9d5920 refactor: use union member type selector for authz sources 2023-01-09 14:26:16 +01:00
Zaiming (Stone) Shi 3859878985
Merge pull request #9684 from zmstone/0105-do-not-use-testcase-skip
test: do not use tc_user_skip for test cases
2023-01-09 09:02:06 +01:00
firest ea405fe55d test(bridges): add test case for the PostgreSQL backend 2023-01-06 11:56:28 +08:00
Zaiming (Stone) Shi 5f12cdff6c ci: should not skip tests in github actions 2023-01-05 22:47:08 +01:00
Zaiming (Stone) Shi fc12a8c4c8 test: do not use tc_user_skip for test cases 2023-01-05 22:47:06 +01:00
Thales Macedo Garitezi fd360ac6c0 feat(buffer_worker): refactor buffer/resource workers to always use queue
This makes the buffer/resource workers always use `replayq` for
queuing, along with collecting multiple requests in a single call.
This is done to avoid long message queues for the buffer workers and
rely on `replayq`'s capabilities of offloading to disk and detecting
overflow.

Also, this deprecates the `enable_batch` and `enable_queue` resource
creation options, as: i) queuing is now always enables; ii) batch_size
> 1 <=> batch_enabled.  The corresponding metric
`dropped.queue_not_enabled` is dropped, along with `batching`.  The
batching is too ephemeral, especially considering a default batch time
of 20 ms, and is not shown in the dashboard, so it was removed.
2023-01-05 10:15:09 -03:00
Thales Macedo Garitezi 5bd9f110d6 test: attempt to reduce flakiness 2023-01-05 10:11:59 -03:00
Thales Macedo Garitezi 5df485df17 test: attempt to fix flaky test 2023-01-04 10:38:21 -03:00
Thales Macedo Garitezi d6c8a106da test(gcp_pubsub): fix flaky test 2023-01-03 08:52:37 -03:00
lafirest 7985cd3536
Merge pull request #9638 from lafirest/fix/mysql_dup
fix(mysql): fix the problem of data loss and bad match when mysql is disconnected
2023-01-02 23:15:44 +08:00
Thales Macedo Garitezi 7e02eac3bc
Merge pull request #9619 from thalesmg/refactor-gauges-v50
refactor(metrics): use absolute gauge values rather than deltas (v5.0)
2023-01-02 10:56:47 -03:00
firest c77717b1f1 test(mysql): fix test case error 2023-01-02 21:55:22 +08:00
Zaiming (Stone) Shi dbc10c2eed chore: update copyright year 2023 2023-01-02 09:22:27 +01:00
Thales Macedo Garitezi 61246c43c4 fix(kakfa_producer): prevent multiple producers from multiplying each other's metrics 2022-12-30 16:51:24 -03:00
Thales Macedo Garitezi 8b060a75f1 refactor(metrics): use absolute gauge values rather than deltas
https://emqx.atlassian.net/browse/EMQX-8548

Currently, we face several issues trying to keep resource metrics
reasonable.  For example, when a resource is re-created and has its
metrics reset, but then its durable queue resumes its previous work
and leads to strange (often negative) metrics.

Instead using `counters` that are shared by more than one worker to
manage gauges, we introduce an ETS table whose key is not only scoped
by the Resource ID as before, but also by the worker ID.  This way,
when a worker starts/terminates, they should set their own gauges to
their values (often 0 or `replayq:count` when resuming off a queue).
With this scoping and initialization procedure, we'll hopefully avoid
hitting those strange metrics scenarios and have better control over
the gauges.
2022-12-30 16:51:24 -03:00
Zaiming (Stone) Shi 0ce1ca89b7 refactor: use string type for server and servers 2022-12-30 14:20:23 +01:00
Thales Macedo Garitezi 446a4c74d0 fix(gcp_pubsub): fix potential jwt accumulation and lack of refresh (v5.0)
https://emqx.atlassian.net/browse/EMQX-8653
Related:
- https://emqx.atlassian.net/browse/EEC-737
- https://emqx.atlassian.net/browse/EMQX-8652

Since the rule resource testing mechanism creates a new resource to
test the configuration, a new JWT associated with an unique temporary
resource was being created and left in the JWT table, leaking it.

Also, a wrong case clause when setting the new refresh timer for the
JWT worker was preventing it from refreshing from the 2nd refresh
onward.
2022-12-29 16:30:36 -03:00
Thales Macedo Garitezi 4819794401 test(refactor): decrease test teardown noise 2022-12-27 10:30:14 -03:00
Thales Macedo Garitezi a78ecc4bb6
Merge pull request #9579 from thalesmg/bugfix-kafka-producer-stop-client-ee50
fix(kafka_producer): cleanup client after failing to start producers
2022-12-26 08:59:36 -03:00
Erik Timan 13942f5c49 refactor: rename error return in mysql connector 2022-12-22 10:29:12 +01:00
Erik Timan aab914d65a test: refactor EE mysql bridge test case
Co-authored-by: Thales Macedo Garitezi <thalesmg@gmail.com>
2022-12-22 10:27:38 +01:00
Erik Timan 7df24000a0 test: add more EE mysql bridge test cases 2022-12-22 10:27:38 +01:00
Thales Macedo Garitezi 7242ce426b fix(kafka_producer): cleanup client after failing to start producers
https://emqx.atlassian.net/browse/EMQX-8547

If a Kafka Producer bridge is given bad configuration (e.g.: bad authn
credentials), the Wolff client process is started successfully, as it
does not attempt to connect, but when the producer process is
attempted to be started, it fails (only then the client tries to
connect to Kafka).  At this point, an error was thrown, but the
supervised client process remained running.

If the configuration was later fixed and the bridge updated, which
prompted its removal and recreation, the Wolff client would report to
be "already started", so it would never pick up the new (fixed)
configuration, and the producers would perpetually fail to start until
the node would be restarted.

We simply ensure the client is stopped before throwing the error,
unrolling the start-up procedure.
2022-12-19 17:48:40 -03:00
Thales Macedo Garitezi c0b208dd9e fix(influxdb): check if fields are empty before sending
Related Issue: https://emqx.atlassian.net/browse/EMQX-8461

Currently, the InfluxDB client raises an error if an empty `fields`
map is passed to it for pushing data.

```
  14:03:35.563 [error] [InfluxDB] Encode [
    %{
      fields: %{},
      measurement: "t/topic",
      tags: %{},
      timestamp: 1670864615563
    }
  ] failed: :error :missing_field [
    {:influxdb_line, :encode_fields, 1,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 60
     ]},
    {:influxdb_line, :encode_, 1,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 44
     ]},
    {:influxdb_line, :"-encode_/1-fun-0-", 2,
     [
       file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl',
       line: 38
     ]},
    {:influxdb, :write, 2,
     [file: '/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl', line: 79]},
    {:emqx_ee_connector_influxdb, :do_query, 3,
     [
       file: '/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl',
       line: 322
     ]},
    {:emqx_resource_worker, :apply_query_fun, 6,
     [
       file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl',
       line: 454
     ]},
    {:emqx_resource_worker, :query_or_acc, 3,
     [
       file: '/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl',
       line: 306
     ]},
    {:gen_statem, :loop_state_callback, 11, [file: 'gen_statem.erl', line: 1203]},
    {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 226]}
  ]
  2022-12-12T14:03:35.563607-03:00 [error] [InfluxDB] Encode [#{fields => #{},measurement => <<"t/topic">>,tags => #{},timestamp => 1670864615563}] failed: error missing_field [{influxdb_line,encode_fields,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,60}]},{influxdb_line,encode_,1,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,44}]},{influxdb_line,'-encode_/1-fun-0-',2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb_line.erl"},{line,38}]},{influxdb,write,2,[{file,"/home/thales/dev/emqx/emqx/deps/influxdb/src/influxdb.erl"},{line,79}]},{emqx_ee_connector_influxdb,do_query,3,[{file,"/home/thales/dev/emqx/emqx/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl"},{line,322}]},{emqx_resource_worker,apply_query_fun,6,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,454}]},{emqx_resource_worker,query_or_acc,3,[{file,"/home/thales/dev/emqx/emqx/apps/emqx_resource/src/emqx_resource_worker.erl"},{line,306}]},{gen_statem,loop_state_callback,11,[{file,"gen_statem.erl"},{line,1203}]},{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,226}]}]

  14:03:35.565 [error] [connector: "bridge:influxdb_api_v2:aaa", msg: 'influxdb write point failed', reason: :missing_field]
  2022-12-12T14:03:35.565345-03:00 [error] connector: <<"bridge:influxdb_api_v2:aaa">>, line: 335, mfa: emqx_ee_connector_influxdb:do_query/3, msg: influxdb write point failed, reason: missing_field
  []

  14:03:35.565 [error] [id: "bridge:influxdb_api_v2:aaa", msg: :send_error, reason: :missing_field]
  iex(emqx@127.0.0.1)2> 2022-12-12T14:03:35.565913-03:00 [error] id: <<"bridge:influxdb_api_v2:aaa">>, line: 396, mfa: emqx_resource_worker:handle_query_result/4, msg: send_error, reason: missing_field
```

Instead of raising, we check if the interpolation resulted in an empty
map due to lack of context and just fail the push more gracefully.

Related to this, the original issue _appears_ to be related to a
frontend issue (to be confirmed and fixed separately), where the
it is not encoding the field types:
https://emqx.atlassian.net/browse/EMQX-8461?focusedCommentId=24805
2022-12-16 09:52:34 -03:00
Zaiming (Stone) Shi 42c58e2a91 Merge remote-tracking branch 'origin/release-50' into 1214-sync-master-upstreams 2022-12-14 15:29:13 +01:00
Thales Macedo Garitezi 464d0a5057 refactor(test): use a linked janitor for test teardown 2022-12-12 17:18:19 -03:00
Thales Macedo Garitezi b9bc82f87a feat(gcp_pubsub): add `local_topic` config
Given the implicit convention that an egress bridge containing the
`local_topic` config will forward messages without the need for a rule
action, this was added to avoid needing a rule action.
2022-12-12 17:18:19 -03:00
Thales Macedo Garitezi a095867358 test(refactor): add ids to namespace `on_exit` callbacks 2022-12-12 17:18:19 -03:00
Thales Macedo Garitezi 1cd91a24e9 feat(gcp_pubsub): implement GCP PubSub bridge (ee5.0) 2022-12-12 17:18:19 -03:00
Thales Macedo Garitezi 82be9d878d test(flaky): avoid inter-suite flakiness 2022-12-12 17:18:19 -03:00
Thales Macedo Garitezi c69022f3c4 test(ci): clean config after test
Trying to fix this error for profile `emqx`:

```
=CRASH REPORT==== 30-Nov-2022::13:25:46.763989 ===
  crasher:
    initial call: application_master:init/4
    pid: <0.9682.1>
    registered_name: []
    exception exit: {bad_return,
                        {{emqx_conf_app,start,[normal,[]]},
                         {emqx_conf_schema,
                             [#{kind => validation_error,path => "bridges",
                                reason => unknown_fields,
                                unknown => <<"influxdb_api_v1">>,
                                unmatched => <<"mqtt,webhook">>}]}}}
      in function  application_master:init/4 (application_master.erl, line 142)
    ancestors: [<0.9681.1>]
    message_queue_len: 1
    messages: [{'EXIT',<0.9683.1>,normal}]
    links: [<0.9681.1>,<0.44.0>]
    dictionary: []
    trap_exit: true
    status: running
    heap_size: 610
    stack_size: 29
    reductions: 195
  neighbours:

=INFO REPORT==== 30-Nov-2022::13:25:46.777895 ===
    application: emqx_conf
    exited: {bad_return,
                {{emqx_conf_app,start,[normal,[]]},
                 {emqx_conf_schema,
                     [#{kind => validation_error,path => "bridges",
                        reason => unknown_fields,
                        unknown => <<"influxdb_api_v1">>,
                        unmatched => <<"mqtt,webhook">>}]}}}
    type: temporary

%%% emqx_plugins_SUITE ==> init_per_suite: FAILED
%%% emqx_plugins_SUITE ==> {{failed_to_start_app,emqx_conf,
     {emqx_conf,
         {bad_return,
             {{emqx_conf_app,start,[normal,[]]},
              {emqx_conf_schema,
                  [#{kind => validation_error,path => "bridges",
                     reason => unknown_fields,
                     unknown => <<"influxdb_api_v1">>,
                     unmatched => <<"mqtt,webhook">>}]}}}}},
 [{emqx_common_test_helpers,start_app,4,
      [{file,
           "/__w/emqx/emqx/source/apps/emqx/test/emqx_common_test_helpers.erl"},
       {line,227}]},
  {lists,foreach,2,[{file,"lists.erl"},{line,1342}]},
  {emqx_plugins_SUITE,init_per_suite,1,
      [{file,
           "/__w/emqx/emqx/source/apps/emqx_plugins/test/emqx_plugins_SUITE.erl"},
       {line,34}]},
  {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]},
  {test_server,run_test_case_eval1,6,[{file,"test_server.erl"},{line,1380}]},
  {test_server,run_test_case_eval,9,[{file,"test_server.erl"},{line,1224}]}]}

%%% emqx_plugins_SUITE ==> t_bad_info_json: SKIPPED
%%% emqx_plugins_SUITE ==> {tc_auto_skip,
    {failed,
        {emqx_plugins_SUITE,init_per_suite,
            {'EXIT',
                {{failed_to_start_app,emqx_conf,
                     {emqx_conf,
                         {bad_return,
                             {{emqx_conf_app,start,[normal,[]]},
                              {emqx_conf_schema,
                                  [#{kind => validation_error,
                                     path => "bridges",
                                     reason => unknown_fields,
                                     unknown => <<"influxdb_api_v1">>,
                                     unmatched => <<"mqtt,webhook">>}]}}}}},
                 [{emqx_common_test_helpers,start_app,4,
                      [{file,
                           "/__w/emqx/emqx/source/apps/emqx/test/emqx_common_test_helpers.erl"},
                       {line,227}]},
                  {lists,foreach,2,[{file,"lists.erl"},{line,1342}]},
                  {emqx_plugins_SUITE,init_per_suite,1,
                      [{file,
                           "/__w/emqx/emqx/source/apps/emqx_plugins/test/emqx_plugins_SUITE.erl"},
                       {line,34}]},
                  {test_server,ts_tc,3,[{file,"test_server.erl"},{line,1783}]},
                  {test_server,run_test_case_eval1,6,
                      [{file,"test_server.erl"},{line,1380}]},
                  {test_server,run_test_case_eval,9,
                      [{file,"test_server.erl"},{line,1224}]}]}}}}}
```
2022-12-12 17:18:13 -03:00
Zaiming (Stone) Shi 40809b2ad0 Merge remote-tracking branch 'origin/dev/ee5.0' into release-50 2022-12-09 11:45:52 +01:00
Zaiming (Stone) Shi dc14cd450d test: render config for emqx_conf only for ee bridge tests 2022-12-07 18:27:06 +01:00
Ilya Averyanov 6692b0c895 feat(bridge): add Redis bridge 2022-12-06 23:15:42 +03:00
Erik Timan 499a32ce36 test: remove unnecessary async tests in EE mysql bridge
The async test cases is not needed since the mysql connector uses the
always_sync callback mode.
2022-12-01 16:38:03 +01:00
Erik Timan 5461311edc test: more EE mysql bridge tests 2022-12-01 10:27:51 +01:00
Erik Timan eb62192838 test: expand EE mysql bridge test with toxiproxy 2022-12-01 10:27:51 +01:00
Erik Timan 96bff1d32e test: improve basic tests for EE mysql bridge 2022-12-01 10:27:51 +01:00
Erik Timan 3e679ceb57 test: add basic tests for EE mysql bridge 2022-12-01 10:27:51 +01:00
Thales Macedo Garitezi 0f9cc0d93f test(refactor): stop snabbkaffe on every test 2022-11-11 10:03:30 -03:00
Thales Macedo Garitezi 04588148b7 test(influxdb): increase influxdb bridge/connector coverage (ee5.0) 2022-11-07 15:15:49 -03:00
Thales Macedo Garitezi 98500313eb fix(kafka): some fixes for kafka producer
- MQTT topic should be a binary
- use correct gauge functions from `wolff_metrics`.
- don't double increment success counter for kafka action
- adds a few more metrics assertions
2022-10-10 17:11:29 -03:00
Kjell Winblad 57270fb8fc feat: add support for counters and gauges to the Kafka Bridge
This commit adds support for counters and gauges to the Kafka Brige.
The Kafka bridge uses [Wolff](https://github.com/kafka4beam/wolff) for
the  Kafka connection. Wolff does its own batching and does not use the
batching functionality in `emqx_resource_worker` that is used by other
bridge types. Therefore, the counter events have to be generated by
Wolff. We have added
[telemetry](https://github.com/beam-telemetry/telemetry) events to Wolff
that we hook into to change counters and gauges for the Kafka bridge. The
counter called `matched` does not depend on specific functionality of
any bridge type so the updates of this counter is moved higher up in the
call chain then previously so that it also gets updated for Kafka
bridges.
2022-10-10 14:40:57 -03:00
Kjell Winblad a3c88b40a0 test: changes to make Kafka container run in GitHub action 2022-09-23 14:33:41 +02:00
Kjell Winblad 8e514680d8 test: fix bad binary pattern 2022-09-23 13:54:11 +02:00
Zaiming (Stone) Shi 7b601bf970 chore: delete bad parse_bridge function clause 2022-09-23 11:53:02 +02:00
Kjell Winblad adc67b165b test: test cases for Kafka bridge REST API 2022-09-23 10:09:07 +02:00
Kjell Winblad ac2922fc4c test: Kafka bridge cases for all combinations of SASL and SSL 2022-09-16 16:44:12 +02:00
Kjell Winblad be7a8c11a8 test: make bridge name unique in tests 2022-09-15 16:21:32 +02:00
Kjell Winblad 4dc26eeba7 fix: use different instance id in Kafka auth test 2022-09-15 07:33:07 +02:00
Kjell Winblad 5820b028cb feat: add test case for Kerberos Kafka authentication 2022-09-14 17:03:37 +02:00
Kjell Winblad f0e03086a6 test: add test cases for Kafka SASL auth mechanisms plain and scram 2022-09-13 19:46:56 +02:00
Zaiming (Stone) Shi 0c1595be02 feat: Add Kafka connector 2022-09-13 19:46:56 +02:00
Thales Macedo Garitezi f1048babd8 test: refactor to use hocon and schema 2022-09-02 12:04:37 -03:00
Thales Macedo Garitezi 3d4afd65df feat: add mongodb bridge (e5.0) 2022-09-01 14:47:14 -03:00
JimMoen 4d0516c6e9 chore: use HStreamDB for module name 2022-08-07 19:37:44 +08:00
DDDHuang 9ae7c62656 fix: exs deps & bad suites 2022-07-27 16:24:13 +08:00