fix(gw): throw the exproto start grpc server starting failure error
This commit is contained in:
parent
eb4be03012
commit
3645cb244b
|
@ -83,7 +83,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
logger:error("Failed to update ~ts; "
|
logger:error("Failed to update ~ts; "
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
[GwName, Class, Reason, Stk]),
|
[GwName, Class, Reason, Stk]),
|
||||||
{error, {Class, Reason}}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
|
|
@ -97,7 +97,7 @@ gateway_insta(delete, #{bindings := #{name := Name0}}) ->
|
||||||
ok ->
|
ok ->
|
||||||
{204};
|
{204};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(400, Reason)
|
emqx_gateway_http:reason2resp(Reason)
|
||||||
end
|
end
|
||||||
end);
|
end);
|
||||||
gateway_insta(get, #{bindings := #{name := Name0}}) ->
|
gateway_insta(get, #{bindings := #{name := Name0}}) ->
|
||||||
|
@ -134,7 +134,7 @@ gateway_insta(put, #{body := GwConf0,
|
||||||
{ok, Gateway} ->
|
{ok, Gateway} ->
|
||||||
{200, Gateway};
|
{200, Gateway};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
return_http_error(500, Reason)
|
emqx_gateway_http:reason2resp(Reason)
|
||||||
end
|
end
|
||||||
end).
|
end).
|
||||||
|
|
||||||
|
|
|
@ -311,7 +311,12 @@ return_http_error(Code, Msg) ->
|
||||||
|
|
||||||
-spec reason2msg({atom(), map()} | any()) -> error | string().
|
-spec reason2msg({atom(), map()} | any()) -> error | string().
|
||||||
reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
|
reason2msg({badconf, #{key := Key, value := Value, reason := Reason}}) ->
|
||||||
fmtstr("Bad config value '~s' for '~s', reason: ~s", [Value, Key, Reason]);
|
NValue = case emqx_json:safe_encode(Value) of
|
||||||
|
{ok, Str} -> Str;
|
||||||
|
{error, _} -> emqx_gateway_utils:stringfy(Value)
|
||||||
|
end,
|
||||||
|
fmtstr("Bad config value '~s' for '~s', reason: ~s",
|
||||||
|
[NValue, Key, Reason]);
|
||||||
reason2msg({badres, #{resource := gateway,
|
reason2msg({badres, #{resource := gateway,
|
||||||
gateway := GwName,
|
gateway := GwName,
|
||||||
reason := not_found}}) ->
|
reason := not_found}}) ->
|
||||||
|
|
|
@ -57,18 +57,19 @@ on_gateway_load(_Gateway = #{ name := GwName,
|
||||||
config := Config
|
config := Config
|
||||||
}, Ctx) ->
|
}, Ctx) ->
|
||||||
%% XXX: How to monitor it ?
|
%% XXX: How to monitor it ?
|
||||||
%% Start grpc client pool & client channel
|
|
||||||
PoolName = pool_name(GwName),
|
|
||||||
PoolSize = emqx_vm:schedulers() * 2,
|
|
||||||
{ok, PoolSup} = emqx_pool_sup:start_link(
|
|
||||||
PoolName, hash, PoolSize,
|
|
||||||
{emqx_exproto_gcli, start_link, []}),
|
|
||||||
_ = start_grpc_client_channel(GwName,
|
_ = start_grpc_client_channel(GwName,
|
||||||
maps:get(handler, Config, undefined)
|
maps:get(handler, Config, undefined)
|
||||||
),
|
),
|
||||||
%% XXX: How to monitor it ?
|
%% XXX: How to monitor it ?
|
||||||
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
_ = start_grpc_server(GwName, maps:get(server, Config, undefined)),
|
||||||
|
|
||||||
|
%% XXX: How to monitor it ?
|
||||||
|
PoolName = pool_name(GwName),
|
||||||
|
PoolSize = emqx_vm:schedulers() * 2,
|
||||||
|
{ok, PoolSup} = emqx_pool_sup:start_link(
|
||||||
|
PoolName, hash, PoolSize,
|
||||||
|
{emqx_exproto_gcli, start_link, []}),
|
||||||
|
|
||||||
NConfig = maps:without(
|
NConfig = maps:without(
|
||||||
[server, handler],
|
[server, handler],
|
||||||
Config#{pool_name => PoolName}
|
Config#{pool_name => PoolName}
|
||||||
|
@ -103,7 +104,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
logger:error("Failed to update ~ts; "
|
logger:error("Failed to update ~ts; "
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
[GwName, Class, Reason, Stk]),
|
[GwName, Class, Reason, Stk]),
|
||||||
{error, {Class, Reason}}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
@ -141,8 +142,11 @@ start_grpc_server(GwName, Options = #{bind := ListenOn}) ->
|
||||||
console_print("Start ~ts gRPC server on ~p successfully.~n",
|
console_print("Start ~ts gRPC server on ~p successfully.~n",
|
||||||
[GwName, ListenOn]);
|
[GwName, ListenOn]);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~p",
|
?ELOG("Failed to start ~ts gRPC server on ~p, reason: ~0p",
|
||||||
[GwName, ListenOn, Reason])
|
[GwName, ListenOn, Reason]),
|
||||||
|
throw({badconf, #{key => server,
|
||||||
|
value => Options,
|
||||||
|
reason => illegal_grpc_server_confs}})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
stop_grpc_server(GwName) ->
|
stop_grpc_server(GwName) ->
|
||||||
|
|
|
@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
logger:error("Failed to update ~ts; "
|
logger:error("Failed to update ~ts; "
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
[GwName, Class, Reason, Stk]),
|
[GwName, Class, Reason, Stk]),
|
||||||
{error, {Class, Reason}}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
|
|
@ -106,7 +106,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
logger:error("Failed to update ~ts; "
|
logger:error("Failed to update ~ts; "
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
[GwName, Class, Reason, Stk]),
|
[GwName, Class, Reason, Stk]),
|
||||||
{error, {Class, Reason}}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
|
|
@ -87,7 +87,7 @@ on_gateway_update(Config, Gateway, GwState = #{ctx := Ctx}) ->
|
||||||
logger:error("Failed to update ~ts; "
|
logger:error("Failed to update ~ts; "
|
||||||
"reason: {~0p, ~0p} stacktrace: ~0p",
|
"reason: {~0p, ~0p} stacktrace: ~0p",
|
||||||
[GwName, Class, Reason, Stk]),
|
[GwName, Class, Reason, Stk]),
|
||||||
{error, {Class, Reason}}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
on_gateway_unload(_Gateway = #{ name := GwName,
|
on_gateway_unload(_Gateway = #{ name := GwName,
|
||||||
|
|
Loading…
Reference in New Issue