From fa4bc921ac0876622183258c28e2b868f775d8be Mon Sep 17 00:00:00 2001 From: DDDHuang <44492639+DDDHuang@users.noreply.github.com> Date: Tue, 26 Jul 2022 15:28:56 +0800 Subject: [PATCH] fix: hstream db connector & bridge, TODO: SUITE --- apps/emqx_bridge/src/emqx_bridge_app.erl | 11 ++++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 2 +- .../src/emqx_ee_bridge_hstream.erl | 4 +- .../src/emqx_ee_connector_hstream.erl | 50 ++++++++++++++++--- 4 files changed, 56 insertions(+), 11 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_app.erl b/apps/emqx_bridge/src/emqx_bridge_app.erl index 3fc4d57ba..3e67290d6 100644 --- a/apps/emqx_bridge/src/emqx_bridge_app.erl +++ b/apps/emqx_bridge/src/emqx_bridge_app.erl @@ -29,6 +29,7 @@ start(_StartType, _StartArgs) -> {ok, Sup} = emqx_bridge_sup:start_link(), + ok = start_ee_apps(), ok = emqx_bridge:load(), ok = emqx_bridge:load_hook(), ok = emqx_config_handler:add_handler(?LEAF_NODE_HDLR_PATH, ?MODULE), @@ -41,6 +42,16 @@ stop(_State) -> ok = emqx_bridge:unload_hook(), ok. +-ifdef(EMQX_RELEASE_EDITION). +start_ee_apps() -> + {ok, _} = application:ensure_all_started(emqx_ee_bridge), + {ok, _} = application:ensure_all_started(emqx_ee_connector), + ok. +-else. +start_ee_apps() -> + ok. +-endif. + %% NOTE: We depends on the `emqx_bridge:pre_config_update/3` to restart/stop the %% underlying resources. pre_config_update(_, {_Oper, _, _}, undefined) -> diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index a94089c21..6b2987307 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -35,6 +35,6 @@ fields(bridges) -> {hstream, mk( hoconsc:map(name, ref(emqx_ee_bridge_hstream, "config")), - #{desc => <<"hstream_webhook">>} + #{desc => <<"emqx enterprise config">>} )} ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl index 42dd86f14..56bac456e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_hstream.erl @@ -36,10 +36,10 @@ values(get) -> values(post) -> #{ type => hstream, - name => <<"hstream_bridge_demo">>, + name => <<"demo">>, url => <<"http://127.0.0.1:6570">>, stream => <<"stream1">>, - ordering_key => <<"${topic}">>, + ordering_key => <<"some_key">>, pool_size => 8, enable => true, direction => egress, diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl index 7ba92f809..fa8439851 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_hstream.erl @@ -46,9 +46,14 @@ on_stop(InstId, #{client := Client, producer := Producer}) -> stop_producer => StopProducerRes }). -on_query(_InstId, {OrderingKey, Payload, Record}, AfterQuery, #{producer := Producer}) -> - Record = hstreamdb:to_record(OrderingKey, raw, Payload), - do_append(AfterQuery, false, Producer, Record). +on_query( + _InstId, + {send_message, Data}, + AfterQuery, + #{producer := Producer, ordering_key := OrderingKey, payload := Payload} +) -> + Record = to_record(OrderingKey, Payload, Data), + do_append(AfterQuery, Producer, Record). on_get_status(_InstId, #{client := Client}) -> case is_alive(Client) of @@ -88,8 +93,13 @@ start_client(InstId, Config) -> do_start_client(InstId, Config) catch E:R:S -> - io:format("E:R:S ~p:~p ~n~p~n", [E, R, S]), - error(E) + ?SLOG(error, #{ + msg => "start hstream connector error", + connector => InstId, + error => E, + reason => R, + stack => S + }) end. do_start_client(InstId, Config = #{url := Server, pool_size := PoolSize}) -> @@ -167,7 +177,18 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi msg => "hstream connector: producer started" }), EnableBatch = maps:get(enable_batch, Options, false), - {ok, #{client => Client, producer => Producer, enable_batch => EnableBatch}}; + PayloadBin = maps:get(payload, Options, <<"">>), + Payload = emqx_plugin_libs_rule:preproc_tmpl(PayloadBin), + OrderingKeyBin = maps:get(ordering_key, Options, <<"">>), + OrderingKey = emqx_plugin_libs_rule:preproc_tmpl(OrderingKeyBin), + State = #{ + client => Client, + producer => Producer, + enable_batch => EnableBatch, + ordering_key => OrderingKey, + payload => Payload + }, + {ok, State}; {error, {already_started, Pid}} -> ?SLOG(info, #{ msg => "starting hstream connector: producer, find old producer. restart producer", @@ -184,6 +205,19 @@ start_producer(InstId, Client, Options = #{stream := Stream, pool_size := PoolSi {error, Reason} end. +to_record(OrderingKeyTmpl, PayloadTmpl, Data) -> + OrderingKey = emqx_plugin_libs_rule:proc_tmpl(OrderingKeyTmpl, Data), + Payload = emqx_plugin_libs_rule:proc_tmpl(PayloadTmpl, Data), + to_record(OrderingKey, Payload). + +to_record(OrderingKey, Payload) when is_binary(OrderingKey) -> + to_record(binary_to_list(OrderingKey), Payload); +to_record(OrderingKey, Payload) -> + hstreamdb:to_record(OrderingKey, raw, Payload). + +do_append(AfterQuery, Producer, Record) -> + do_append(AfterQuery, false, Producer, Record). + %% TODO: this append is async, remove or change it after we have better disk cache. % do_append(AfterQuery, true, Producer, Record) -> % case hstreamdb:append(Producer, Record) of @@ -221,10 +255,10 @@ do_append(AfterQuery, false, Producer, Record) -> end. client_name(InstId) -> - "backend_hstream_client:" ++ to_string(InstId). + "client:" ++ to_string(InstId). produce_name(ActionId) -> - list_to_atom("backend_hstream_producer:" ++ to_string(ActionId)). + list_to_atom("producer:" ++ to_string(ActionId)). to_string(List) when is_list(List) -> List; to_string(Bin) when is_binary(Bin) -> binary_to_list(Bin);