feat: integrate OpenTelemetry log handler

This commit is contained in:
Serge Tupchii 2023-11-09 20:48:50 +02:00
parent becaa0f3f5
commit d9f95cdc56
13 changed files with 363 additions and 72 deletions

View File

@ -77,7 +77,8 @@
%% Callback to upgrade config after loaded from config file but before validation.
upgrade_raw_conf(RawConf) ->
emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf).
RawConf1 = emqx_connector_schema:transform_bridges_v1_to_connectors_and_bridges_v2(RawConf),
emqx_otel_schema:upgrade_legacy_metrics(RawConf1).
%% root config should not have a namespace
namespace() -> undefined.

View File

@ -69,9 +69,9 @@ stop_apps() ->
?SLOG(notice, #{msg => "stopping_emqx_apps"}),
_ = emqx_alarm_handler:unload(),
ok = emqx_conf_app:unset_config_loaded(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())),
%% Mute otel deps application.
_ = emqx_otel:stop_otel(),
lists:foreach(fun stop_one_app/1, lists:reverse(sorted_reboot_apps())).
ok = emqx_otel_app:stop_deps().
%% Those port apps are terminated after the main apps
%% Don't need to stop when reboot.

View File

@ -1,9 +1,15 @@
{application, emqx_opentelemetry, [
{description, "OpenTelemetry for EMQX Broker"},
{vsn, "0.1.3"},
{vsn, "0.2.0"},
{registered, []},
{mod, {emqx_otel_app, []}},
{applications, [kernel, stdlib, emqx]},
{applications, [
kernel,
stdlib,
emqx,
%% otel metrics depend on emqx_mgmt_cache
emqx_management
]},
{env, []},
{modules, []},
{licenses, ["Apache 2.0"]},

View File

@ -103,10 +103,24 @@ otel_config_schema() ->
otel_config_example() ->
#{
logs => #{
enable => true,
exporter =>
#{
exporter => #{
endpoint => "http://localhost:4317",
interval => "10s"
ssl_options => #{
enable => false
}
},
level => warning
},
metrics => #{
enable => true,
exporter => #{
endpoint => "http://localhost:4317",
interval => "10s",
ssl_options => #{
enable => false
}
}
}
}.

View File

@ -19,11 +19,17 @@
-behaviour(application).
-export([start/2, stop/1]).
-export([stop_deps/0]).
start(_StartType, _StartArgs) ->
emqx_otel_config:add_handler(),
ok = emqx_otel_config:add_otel_log_handler(),
emqx_otel_sup:start_link().
stop(_State) ->
emqx_otel_config:remove_handler(),
_ = emqx_otel_config:remove_otel_log_handler(),
ok.
stop_deps() ->
emqx_otel_config:stop_all_otel_apps().

View File

@ -19,9 +19,16 @@
-define(OPTL, [opentelemetry]).
-define(OTEL_EXPORTER, opentelemetry_exporter).
-define(OTEL_LOG_HANDLER, otel_log_handler).
-define(OTEL_LOG_HANDLER_ID, opentelemetry_handler).
-export([add_handler/0, remove_handler/0]).
-export([post_config_update/5]).
-export([update/1]).
-export([add_otel_log_handler/0, remove_otel_log_handler/0]).
-export([stop_all_otel_apps/0]).
-export([otel_exporter/1]).
update(Config) ->
case
@ -45,14 +52,109 @@ remove_handler() ->
ok = emqx_config_handler:remove_handler(?OPTL),
ok.
post_config_update(?OPTL, _Req, Old, Old, _AppEnvs) ->
ok;
post_config_update(?OPTL, _Req, New, _Old, AppEnvs) ->
application:set_env(AppEnvs),
ensure_otel(New);
MetricsRes = ensure_otel_metrics(New),
LogsRes = ensure_otel_logs(New),
_ = maybe_stop_all_otel_apps(New),
case {MetricsRes, LogsRes} of
{ok, ok} -> ok;
Other -> {error, Other}
end;
post_config_update(_ConfPath, _Req, _NewConf, _OldConf, _AppEnvs) ->
ok.
ensure_otel(#{enable := true} = Conf) ->
_ = emqx_otel:stop_otel(),
emqx_otel:start_otel(Conf);
ensure_otel(#{enable := false}) ->
emqx_otel:stop_otel().
stop_all_otel_apps() ->
_ = application:stop(opentelemetry),
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
ok.
add_otel_log_handler() ->
ensure_otel_logs(emqx:get_config(?OPTL)).
remove_otel_log_handler() ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
otel_exporter(ExporterConf) ->
#{
endpoint := Endpoint,
protocol := Proto,
ssl_options := SSLOpts
} = ExporterConf,
{?OTEL_EXPORTER, #{
endpoint => Endpoint,
protocol => Proto,
ssl_options => ssl_opts(Endpoint, SSLOpts)
}}.
%% Internal functions
ensure_otel_metrics(#{metrics := #{enable := true} = MetricsConf}) ->
_ = emqx_otel_metrics:stop_otel(),
emqx_otel_metrics:start_otel(MetricsConf);
ensure_otel_metrics(#{metrics := #{enable := false}}) ->
emqx_otel_metrics:stop_otel();
ensure_otel_metrics(_) ->
ok.
ensure_otel_logs(#{logs := #{enable := true} = LogsConf}) ->
ok = remove_handler_if_present(?OTEL_LOG_HANDLER_ID),
ok = ensure_log_apps(),
HandlerConf = tr_handler_conf(LogsConf),
%% NOTE: should primary logger level be updated if it's higher than otel log level?
logger:add_handler(?OTEL_LOG_HANDLER_ID, ?OTEL_LOG_HANDLER, HandlerConf);
ensure_otel_logs(#{logs := #{enable := false}}) ->
remove_handler_if_present(?OTEL_LOG_HANDLER_ID).
remove_handler_if_present(HandlerId) ->
case logger:get_handler_config(HandlerId) of
{ok, _} ->
ok = logger:remove_handler(HandlerId);
_ ->
ok
end.
ensure_log_apps() ->
{ok, _} = application:ensure_all_started(opentelemetry_exporter),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
ok.
maybe_stop_all_otel_apps(#{metrics := #{enable := false}, logs := #{enable := false}}) ->
stop_all_otel_apps();
maybe_stop_all_otel_apps(_) ->
ok.
tr_handler_conf(Conf) ->
#{
level := Level,
max_queue_size := MaxQueueSize,
exporting_timeout := ExportingTimeout,
scheduled_delay := ScheduledDelay,
exporter := ExporterConf
} = Conf,
#{
level => Level,
config => #{
max_queue_size => MaxQueueSize,
exporting_timeout_ms => ExportingTimeout,
scheduled_delay_ms => ScheduledDelay,
exporter => otel_exporter(ExporterConf)
}
}.
ssl_opts(Endpoint, SSLOpts) ->
case is_ssl(Endpoint) of
true ->
emqx_tls_lib:to_client_opts(SSLOpts#{enable => true});
false ->
[]
end.
is_ssl(<<"https://", _/binary>> = _Endpoint) ->
true;
is_ssl(_Endpoint) ->
false.

View File

@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_otel).
-module(emqx_otel_metrics).
-include_lib("emqx/include/logger.hrl").
-export([start_otel/1, stop_otel/0]).
@ -29,7 +29,7 @@ start_otel(Conf) ->
assert_started(supervisor:start_child(?SUPERVISOR, Spec)).
stop_otel() ->
ok = cleanup(),
Res =
case erlang:whereis(?SUPERVISOR) of
undefined ->
ok;
@ -39,7 +39,9 @@ stop_otel() ->
{error, not_found} -> ok;
Error -> Error
end
end.
end,
ok = cleanup(),
Res.
start_link(Conf) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Conf, []).
@ -71,32 +73,40 @@ setup(_Conf) ->
ok.
ensure_apps(Conf) ->
#{exporter := #{interval := ExporterInterval}} = Conf,
#{exporter := #{interval := ExporterInterval} = Exporter} = Conf,
{ok, _} = application:ensure_all_started(opentelemetry_exporter),
{ok, _} = application:ensure_all_started(opentelemetry),
_ = application:stop(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_api_experimental),
_ = opentelemetry_experimental:stop_default_metrics(),
ok = application:set_env(
opentelemetry_experimental,
readers,
[
#{
id => emqx_otel_metric_reader,
module => otel_metric_reader,
config => #{
exporter => {opentelemetry_exporter, #{}},
exporter => emqx_otel_config:otel_exporter(Exporter),
export_interval_ms => ExporterInterval
}
}
]
),
{ok, _} = application:ensure_all_started(opentelemetry_experimental),
{ok, _} = application:ensure_all_started(opentelemetry_api_experimental),
{ok, _} = opentelemetry_experimental:start_default_metrics(),
ok.
cleanup() ->
_ = application:stop(opentelemetry),
_ = application:stop(opentelemetry_experimental),
_ = application:stop(opentelemetry_experimental_api),
_ = application:stop(opentelemetry_exporter),
safe_stop_default_metrics().
safe_stop_default_metrics() ->
try
_ = opentelemetry_experimental:stop_default_metrics()
catch
%% noramal scenario, metrics supervisor is not started
exit:{noproc, _} -> ok
end,
ok.
create_metric_views() ->

View File

@ -24,16 +24,48 @@
desc/1
]).
-export([upgrade_legacy_metrics/1]).
%% Compatibility with the previous schema that defined only metric fields
upgrade_legacy_metrics(RawConf) ->
case RawConf of
#{<<"opentelemetry">> := Otel} ->
LegacyMetricsFields = [<<"enable">>, <<"exporter">>],
Otel1 = maps:without(LegacyMetricsFields, Otel),
Metrics = maps:with(LegacyMetricsFields, Otel),
case Metrics =:= #{} of
true ->
RawConf;
false ->
RawConf#{<<"opentelemetry">> => Otel1#{<<"metrics">> => Metrics}}
end;
_ ->
RawConf
end.
namespace() -> opentelemetry.
roots() -> ["opentelemetry"].
fields("opentelemetry") ->
[
{exporter,
{metrics,
?HOCON(
?R_REF("exporter"),
#{desc => ?DESC(exporter)}
?R_REF("otel_metrics"),
#{
desc => ?DESC(otel_metrics)
}
)},
{logs,
?HOCON(
?R_REF("otel_logs"),
#{
desc => ?DESC(otel_logs)
}
)}
];
fields("otel_metrics") ->
[
{enable,
?HOCON(
boolean(),
@ -42,41 +74,130 @@ fields("opentelemetry") ->
required => true,
desc => ?DESC(enable)
}
)},
{exporter,
?HOCON(
?R_REF("otel_metrics_exporter"),
#{desc => ?DESC(exporter)}
)}
];
fields("exporter") ->
fields("otel_logs") ->
[
{"protocol",
{level,
?HOCON(
%% http_protobuf is not support for metrics yet.
?ENUM([grpc]),
emqx_conf_schema:log_level(),
#{
mapping => "opentelemetry_exporter.otlp_protocol",
desc => ?DESC(protocol),
default => grpc,
default => warning,
desc => ?DESC(otel_log_handler_level),
importance => ?IMPORTANCE_HIGH
}
)},
{enable,
?HOCON(
boolean(),
#{
default => false,
desc => ?DESC(enable),
importance => ?IMPORTANCE_HIGH
}
)},
{max_queue_size,
?HOCON(
pos_integer(),
#{
default => 2048,
desc => ?DESC(max_queue_size),
importance => ?IMPORTANCE_HIDDEN
}
)},
{"endpoint",
{exporting_timeout,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"30s">>,
desc => ?DESC(exporting_timeout),
importance => ?IMPORTANCE_HIDDEN
}
)},
{scheduled_delay,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"1s">>,
desc => ?DESC(scheduled_delay),
importance => ?IMPORTANCE_HIDDEN
}
)},
{exporter,
?HOCON(
?R_REF("otel_logs_exporter"),
#{
desc => ?DESC(exporter),
importance => ?IMPORTANCE_HIGH
}
)}
];
fields("otel_metrics_exporter") ->
exporter_fields(metrics);
fields("otel_logs_exporter") ->
exporter_fields(logs);
fields("ssl_opts") ->
Schema = emqx_schema:client_ssl_opts_schema(#{}),
lists:keydelete("enable", 1, Schema).
desc("opentelemetry") -> ?DESC(opentelemetry);
desc("exporter") -> ?DESC(exporter);
desc("otel_logs_exporter") -> ?DESC(exporter);
desc("otel_metrics_exporter") -> ?DESC(exporter);
desc("otel_logs") -> ?DESC(otel_logs);
desc("otel_metrics") -> ?DESC(otel_metrics);
desc("ssl_opts") -> ?DESC(exporter_ssl);
desc(_) -> undefined.
exporter_fields(OtelSignal) ->
[
{endpoint,
?HOCON(
emqx_schema:url(),
#{
mapping => "opentelemetry_exporter.otlp_endpoint",
default => <<"http://localhost:4317">>,
desc => ?DESC(endpoint)
default => "http://localhost:4317",
desc => ?DESC(exporter_endpoint),
importance => ?IMPORTANCE_HIGH
}
)},
{"interval",
{protocol,
?HOCON(
%% http protobuf/json may be added in future
?ENUM([grpc]),
#{
default => grpc,
desc => ?DESC(exporter_protocol),
importance => ?IMPORTANCE_HIDDEN
}
)},
{ssl_options,
?HOCON(
?R_REF("ssl_opts"),
#{
desc => ?DESC(exporter_ssl),
importance => ?IMPORTANCE_LOW
}
)}
] ++ exporter_extra_fields(OtelSignal).
%% Let's keep it in exporter config for metrics, as it is different from
%% scheduled_delay_ms opt used for otel traces and logs
exporter_extra_fields(metrics) ->
[
{interval,
?HOCON(
emqx_schema:timeout_duration_ms(),
#{
default => <<"10s">>,
required => true,
desc => ?DESC(interval)
desc => ?DESC(scheduled_delay)
}
)}
].
desc("opentelemetry") -> ?DESC(opentelemetry);
desc("exporter") -> ?DESC(exporter);
desc(_) -> undefined.
];
exporter_extra_fields(_OtelSignal) ->
[].

View File

@ -41,8 +41,8 @@ init([]) ->
period => 512
},
Children =
case emqx_conf:get([opentelemetry]) of
case emqx_conf:get([opentelemetry, metrics]) of
#{enable := false} -> [];
#{enable := true} = Conf -> [worker_spec(emqx_otel, Conf)]
#{enable := true} = Conf -> [worker_spec(emqx_otel_metrics, Conf)]
end,
{ok, {SupFlags, Children}}.

View File

@ -0,0 +1,2 @@
Introduced Open Telemetry Logs Handler that allows to format log events according to Open Telemetry log data model and
export them to the configured Open Telemetry collector or back-end.

10
mix.exs
View File

@ -102,31 +102,31 @@ defmodule EMQXUmbrella.MixProject do
{:opentelemetry_api,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_api",
tag: "v1.3.2-emqx",
tag: "v1.4.2-emqx",
override: true,
runtime: false},
{:opentelemetry,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry",
tag: "v1.3.2-emqx",
tag: "v1.4.2-emqx",
override: true,
runtime: false},
{:opentelemetry_api_experimental,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_api_experimental",
tag: "v1.3.2-emqx",
tag: "v1.4.2-emqx",
override: true,
runtime: false},
{:opentelemetry_experimental,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_experimental",
tag: "v1.3.2-emqx",
tag: "v1.4.2-emqx",
override: true,
runtime: false},
{:opentelemetry_exporter,
github: "emqx/opentelemetry-erlang",
sparse: "apps/opentelemetry_exporter",
tag: "v1.3.2-emqx",
tag: "v1.4.2-emqx",
override: true,
runtime: false}
] ++

View File

@ -85,13 +85,13 @@
, {jsone, {git, "https://github.com/emqx/jsone.git", {tag, "1.7.1"}}}
, {uuid, {git, "https://github.com/okeuday/uuid.git", {tag, "v2.0.6"}}}
%% trace
, {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_api"}}
, {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry"}}
, {opentelemetry_api, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api"}}
, {opentelemetry, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry"}}
%% log metrics
, {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_experimental"}}
, {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_api_experimental"}}
, {opentelemetry_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_experimental"}}
, {opentelemetry_api_experimental, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_api_experimental"}}
%% export
, {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.3.2-emqx"}, "apps/opentelemetry_exporter"}}
, {opentelemetry_exporter, {git_subdir, "https://github.com/emqx/opentelemetry-erlang", {tag, "v1.4.2-emqx"}, "apps/opentelemetry_exporter"}}
]}.
{xref_ignores,

View File

@ -1,15 +1,44 @@
emqx_otel_schema {
opentelemetry.desc: "Open Telemetry Toolkit configuration"
opentelemetry.label: "Open Telemetry"
otel_logs.desc:
"""Open Telemetry Logs configuration. If enabled, EMQX installs a log handler that formats events according to Open Telemetry log data model and
exports them to the configured Open Telemetry collector or backend."""
otel_logs.label: "Open Telemetry Logs"
otel_metrics.desc: "Open Telemetry Metrics configuration."
otel_metrics.label: "Open Telemetry Metrics"
enable.desc: "Enable or disable Open Telemetry signal."
enable.label: "Enable."
exporter.desc: "Open Telemetry Exporter"
exporter.label: "Exporter"
enable.desc: "Enable or disable open telemetry metrics"
max_queue_size.desc:
"""The maximum queue size. After the size is reached Open Telemetry signals are dropped."""
max_queue_size.label: "Max Queue Size"
protocol.desc: "Open Telemetry Exporter Protocol"
exporting_timeout.desc: "The time Open Telemetry signal export can run before it is cancelled."
exporting_timeout.label: "Exporting Timeout"
endpoint.desc: "Open Telemetry Exporter Endpoint"
scheduled_delay.desc: "The delay interval between two consecutive exports of Open Telemetry signals."
scheduled_delay.label: "Scheduled Delay Interval"
interval.desc: "The interval of sending metrics to Open Telemetry Endpoint"
exporter_endpoint.desc:
"""The target URL to which the exporter is going to send Open Telemetry signal data."""
exporter_endpoint.label: "Exporter Endpoint"
exporter_protocol.desc: "The transport protocol of Open Telemetry Exporter"
exporter_protocol.label: "Exporter Protocol"
exporter_ssl.desc: "SSL configuration for the Open Telemetry exporter"
exporter_ssl.label: "SSL Options"
otel_log_handler_level.desc:
"""The log level of the Open Telemetry log handler."""
otel_log_handler_level.label: "Log Level"
}