//===--- Server.cpp - gRPC-based Remote Index Server ---------------------===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// #include "Index.pb.h" #include "Service.grpc.pb.h" #include "index/Index.h" #include "index/Serialization.h" #include "index/Symbol.h" #include "index/remote/marshalling/Marshalling.h" #include "support/Context.h" #include "support/Logger.h" #include "support/Shutdown.h" #include "support/ThreadsafeFS.h" #include "support/Trace.h" #include "llvm/ADT/IntrusiveRefCntPtr.h" #include "llvm/ADT/None.h" #include "llvm/ADT/StringRef.h" #include "llvm/Support/Chrono.h" #include "llvm/Support/CommandLine.h" #include "llvm/Support/Error.h" #include "llvm/Support/FileSystem.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Path.h" #include "llvm/Support/Signals.h" #include "llvm/Support/VirtualFileSystem.h" #include #include #include #include #include namespace clang { namespace clangd { namespace remote { namespace { static constexpr char Overview[] = R"( This is an experimental remote index implementation. The server opens Dex and awaits gRPC lookup requests from the client. )"; llvm::cl::opt IndexPath(llvm::cl::desc(""), llvm::cl::Positional, llvm::cl::Required); llvm::cl::opt IndexRoot(llvm::cl::desc(""), llvm::cl::Positional, llvm::cl::Required); llvm::cl::opt LogLevel{ "log", llvm::cl::desc("Verbosity of log messages written to stderr"), values(clEnumValN(Logger::Error, "error", "Error messages only"), clEnumValN(Logger::Info, "info", "High level execution tracing"), clEnumValN(Logger::Debug, "verbose", "Low level details")), llvm::cl::init(Logger::Info), }; llvm::cl::opt LogPublic{ "log-public", llvm::cl::desc("Avoid logging potentially-sensitive request details"), llvm::cl::init(false), }; llvm::cl::opt TraceFile( "trace-file", llvm::cl::desc("Path to the file where tracer logs will be stored")); llvm::cl::opt PrettyPrint{ "pretty", llvm::cl::desc("Pretty-print JSON output in the trace"), llvm::cl::init(false), }; llvm::cl::opt ServerAddress( "server-address", llvm::cl::init("0.0.0.0:50051"), llvm::cl::desc("Address of the invoked server. Defaults to 0.0.0.0:50051")); static Key CurrentRequest; class RemoteIndexServer final : public v1::SymbolIndex::Service { public: RemoteIndexServer(clangd::SymbolIndex &Index, llvm::StringRef IndexRoot) : Index(Index) { llvm::SmallString<256> NativePath = IndexRoot; llvm::sys::path::native(NativePath); ProtobufMarshaller = std::unique_ptr(new Marshaller( /*RemoteIndexRoot=*/llvm::StringRef(NativePath), /*LocalIndexRoot=*/"")); } private: using stopwatch = std::chrono::steady_clock; grpc::Status Lookup(grpc::ServerContext *Context, const LookupRequest *Request, grpc::ServerWriter *Reply) override { auto StartTime = stopwatch::now(); WithContextValue WithRequestContext(CurrentRequest, Context); logRequest(*Request); trace::Span Tracer("LookupRequest"); auto Req = ProtobufMarshaller->fromProtobuf(Request); if (!Req) { elog("Can not parse LookupRequest from protobuf: {0}", Req.takeError()); return grpc::Status::CANCELLED; } unsigned Sent = 0; unsigned FailedToSend = 0; Index.lookup(*Req, [&](const clangd::Symbol &Item) { auto SerializedItem = ProtobufMarshaller->toProtobuf(Item); if (!SerializedItem) { elog("Unable to convert Symbol to protobuf: {0}", SerializedItem.takeError()); ++FailedToSend; return; } LookupReply NextMessage; *NextMessage.mutable_stream_result() = *SerializedItem; logResponse(NextMessage); Reply->Write(NextMessage); ++Sent; }); LookupReply LastMessage; LastMessage.mutable_final_result()->set_has_more(true); logResponse(LastMessage); Reply->Write(LastMessage); SPAN_ATTACH(Tracer, "Sent", Sent); SPAN_ATTACH(Tracer, "Failed to send", FailedToSend); logRequestSummary("v1/Lookup", Sent, StartTime); return grpc::Status::OK; } grpc::Status FuzzyFind(grpc::ServerContext *Context, const FuzzyFindRequest *Request, grpc::ServerWriter *Reply) override { auto StartTime = stopwatch::now(); WithContextValue WithRequestContext(CurrentRequest, Context); logRequest(*Request); trace::Span Tracer("FuzzyFindRequest"); auto Req = ProtobufMarshaller->fromProtobuf(Request); if (!Req) { elog("Can not parse FuzzyFindRequest from protobuf: {0}", Req.takeError()); return grpc::Status::CANCELLED; } unsigned Sent = 0; unsigned FailedToSend = 0; bool HasMore = Index.fuzzyFind(*Req, [&](const clangd::Symbol &Item) { auto SerializedItem = ProtobufMarshaller->toProtobuf(Item); if (!SerializedItem) { elog("Unable to convert Symbol to protobuf: {0}", SerializedItem.takeError()); ++FailedToSend; return; } FuzzyFindReply NextMessage; *NextMessage.mutable_stream_result() = *SerializedItem; logResponse(NextMessage); Reply->Write(NextMessage); ++Sent; }); FuzzyFindReply LastMessage; LastMessage.mutable_final_result()->set_has_more(HasMore); logResponse(LastMessage); Reply->Write(LastMessage); SPAN_ATTACH(Tracer, "Sent", Sent); SPAN_ATTACH(Tracer, "Failed to send", FailedToSend); logRequestSummary("v1/FuzzyFind", Sent, StartTime); return grpc::Status::OK; } grpc::Status Refs(grpc::ServerContext *Context, const RefsRequest *Request, grpc::ServerWriter *Reply) override { auto StartTime = stopwatch::now(); WithContextValue WithRequestContext(CurrentRequest, Context); logRequest(*Request); trace::Span Tracer("RefsRequest"); auto Req = ProtobufMarshaller->fromProtobuf(Request); if (!Req) { elog("Can not parse RefsRequest from protobuf: {0}", Req.takeError()); return grpc::Status::CANCELLED; } unsigned Sent = 0; unsigned FailedToSend = 0; bool HasMore = Index.refs(*Req, [&](const clangd::Ref &Item) { auto SerializedItem = ProtobufMarshaller->toProtobuf(Item); if (!SerializedItem) { elog("Unable to convert Ref to protobuf: {0}", SerializedItem.takeError()); ++FailedToSend; return; } RefsReply NextMessage; *NextMessage.mutable_stream_result() = *SerializedItem; logResponse(NextMessage); Reply->Write(NextMessage); ++Sent; }); RefsReply LastMessage; LastMessage.mutable_final_result()->set_has_more(HasMore); logResponse(LastMessage); Reply->Write(LastMessage); SPAN_ATTACH(Tracer, "Sent", Sent); SPAN_ATTACH(Tracer, "Failed to send", FailedToSend); logRequestSummary("v1/Refs", Sent, StartTime); return grpc::Status::OK; } grpc::Status Relations(grpc::ServerContext *Context, const RelationsRequest *Request, grpc::ServerWriter *Reply) override { auto StartTime = stopwatch::now(); WithContextValue WithRequestContext(CurrentRequest, Context); logRequest(*Request); trace::Span Tracer("RelationsRequest"); auto Req = ProtobufMarshaller->fromProtobuf(Request); if (!Req) { elog("Can not parse RelationsRequest from protobuf: {0}", Req.takeError()); return grpc::Status::CANCELLED; } unsigned Sent = 0; unsigned FailedToSend = 0; Index.relations( *Req, [&](const SymbolID &Subject, const clangd::Symbol &Object) { auto SerializedItem = ProtobufMarshaller->toProtobuf(Subject, Object); if (!SerializedItem) { elog("Unable to convert Relation to protobuf: {0}", SerializedItem.takeError()); ++FailedToSend; return; } RelationsReply NextMessage; *NextMessage.mutable_stream_result() = *SerializedItem; logResponse(NextMessage); Reply->Write(NextMessage); ++Sent; }); RelationsReply LastMessage; LastMessage.mutable_final_result()->set_has_more(true); logResponse(LastMessage); Reply->Write(LastMessage); SPAN_ATTACH(Tracer, "Sent", Sent); SPAN_ATTACH(Tracer, "Failed to send", FailedToSend); logRequestSummary("v1/Relations", Sent, StartTime); return grpc::Status::OK; } // Proxy object to allow proto messages to be lazily serialized as text. struct TextProto { const google::protobuf::Message &M; friend llvm::raw_ostream &operator<<(llvm::raw_ostream &OS, const TextProto &P) { return OS << P.M.DebugString(); } }; void logRequest(const google::protobuf::Message &M) { vlog("<<< {0}\n{1}", M.GetDescriptor()->name(), TextProto{M}); } void logResponse(const google::protobuf::Message &M) { vlog(">>> {0}\n{1}", M.GetDescriptor()->name(), TextProto{M}); } void logRequestSummary(llvm::StringLiteral RequestName, unsigned Sent, stopwatch::time_point StartTime) { auto Duration = stopwatch::now() - StartTime; auto Millis = std::chrono::duration_cast(Duration).count(); log("[public] request {0} => OK: {1} results in {2}ms", RequestName, Sent, Millis); } std::unique_ptr ProtobufMarshaller; clangd::SymbolIndex &Index; }; // Detect changes in \p IndexPath file and load new versions of the index // whenever they become available. void hotReload(clangd::SwapIndex &Index, llvm::StringRef IndexPath, llvm::vfs::Status &LastStatus, llvm::IntrusiveRefCntPtr &FS) { auto Status = FS->status(IndexPath); // Requested file is same as loaded index: no reload is needed. if (!Status || (Status->getLastModificationTime() == LastStatus.getLastModificationTime() && Status->getSize() == LastStatus.getSize())) return; vlog("Found different index version: existing index was modified at " "{0}, new index was modified at {1}. Attempting to reload.", LastStatus.getLastModificationTime(), Status->getLastModificationTime()); LastStatus = *Status; std::unique_ptr NewIndex = loadIndex(IndexPath); if (!NewIndex) { elog("Failed to load new index. Old index will be served."); return; } Index.reset(std::move(NewIndex)); log("New index version loaded. Last modification time: {0}, size: {1} bytes.", Status->getLastModificationTime(), Status->getSize()); } void runServerAndWait(clangd::SymbolIndex &Index, llvm::StringRef ServerAddress, llvm::StringRef IndexPath) { RemoteIndexServer Service(Index, IndexRoot); grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder Builder; Builder.AddListeningPort(ServerAddress.str(), grpc::InsecureServerCredentials()); Builder.RegisterService(&Service); std::unique_ptr Server(Builder.BuildAndStart()); log("Server listening on {0}", ServerAddress); std::thread ServerShutdownWatcher([&]() { static constexpr auto WatcherFrequency = std::chrono::seconds(5); while (!clang::clangd::shutdownRequested()) std::this_thread::sleep_for(WatcherFrequency); Server->Shutdown(); }); Server->Wait(); ServerShutdownWatcher.join(); } std::unique_ptr makeLogger(llvm::raw_ostream &OS) { if (!LogPublic) return std::make_unique(OS, LogLevel); // Redacted mode: // - messages outside the scope of a request: log fully // - messages tagged [public]: log fully // - errors: log the format string // - others: drop class RedactedLogger : public StreamLogger { public: using StreamLogger::StreamLogger; void log(Level L, const char *Fmt, const llvm::formatv_object_base &Message) override { if (Context::current().get(CurrentRequest) == nullptr || llvm::StringRef(Fmt).startswith("[public]")) return StreamLogger::log(L, Fmt, Message); if (L >= Error) return StreamLogger::log(L, Fmt, llvm::formatv("[redacted] {0}", Fmt)); } }; return std::make_unique(OS, LogLevel); } } // namespace } // namespace remote } // namespace clangd } // namespace clang using clang::clangd::elog; int main(int argc, char *argv[]) { using namespace clang::clangd::remote; llvm::cl::ParseCommandLineOptions(argc, argv, Overview); llvm::sys::PrintStackTraceOnErrorSignal(argv[0]); llvm::sys::SetInterruptFunction(&clang::clangd::requestShutdown); if (!llvm::sys::path::is_absolute(IndexRoot)) { llvm::errs() << "Index root should be an absolute path.\n"; return -1; } llvm::errs().SetBuffered(); // Don't flush stdout when logging for thread safety. llvm::errs().tie(nullptr); auto Logger = makeLogger(llvm::errs()); clang::clangd::LoggingSession LoggingSession(*Logger); llvm::Optional TracerStream; std::unique_ptr Tracer; if (!TraceFile.empty()) { std::error_code EC; TracerStream.emplace(TraceFile, EC, llvm::sys::fs::FA_Read | llvm::sys::fs::FA_Write); if (EC) { TracerStream.reset(); elog("Error while opening trace file {0}: {1}", TraceFile, EC.message()); } else { // FIXME(kirillbobyrev): Also create metrics tracer to track latency and // accumulate other request statistics. Tracer = clang::clangd::trace::createJSONTracer(*TracerStream, /*PrettyPrint=*/false); clang::clangd::vlog("Successfully created a tracer."); } } llvm::Optional TracingSession; if (Tracer) TracingSession.emplace(*Tracer); clang::clangd::RealThreadsafeFS TFS; auto FS = TFS.view(llvm::None); auto Status = FS->status(IndexPath); if (!Status) { elog("{0} does not exist.", IndexPath); return Status.getError().value(); } auto SymIndex = clang::clangd::loadIndex(IndexPath); if (!SymIndex) { llvm::errs() << "Failed to open the index.\n"; return -1; } clang::clangd::SwapIndex Index(std::move(SymIndex)); std::thread HotReloadThread([&Index, &Status, &FS]() { llvm::vfs::Status LastStatus = *Status; static constexpr auto RefreshFrequency = std::chrono::seconds(30); while (!clang::clangd::shutdownRequested()) { hotReload(Index, llvm::StringRef(IndexPath), LastStatus, FS); std::this_thread::sleep_for(RefreshFrequency); } }); runServerAndWait(Index, ServerAddress, IndexPath); HotReloadThread.join(); }