diff --git a/sources/cmd/server.go b/sources/cmd/server.go index b33bd51..79ca36a 100644 --- a/sources/cmd/server.go +++ b/sources/cmd/server.go @@ -5,13 +5,16 @@ package main import "bytes" import "flag" +import "fmt" import "io" import "log" +import "net" import "net/http" import "os" import "os/signal" import "runtime" import "runtime/pprof" +import "sync" import "syscall" import "time" @@ -19,6 +22,7 @@ import "time" import cdb "github.com/cipriancraciun/go-cdb-lib" import "github.com/valyala/fasthttp" +import "github.com/valyala/fasthttp/reuseport" import . "github.com/cipriancraciun/go-cdb-http/lib/common" import . "github.com/cipriancraciun/go-cdb-http/lib/server" @@ -247,9 +251,11 @@ func main_0 () (error) { var _bind string var _archive string - var _archiveMmap bool var _archiveInmem bool + var _archiveMmap bool var _archivePreload bool + var _processes uint + var _threads uint var _debug bool var _profileCpu string @@ -260,9 +266,11 @@ func main_0 () (error) { _bind_0 := _flags.String ("bind", "", ":") _archive_0 := _flags.String ("archive", "", "") - _archiveMmap_0 := _flags.Bool ("archive-mmap", false, "(memory-mapped archive file)") _archiveInmem_0 := _flags.Bool ("archive-inmem", false, "(memory-loaded archive file)") + _archiveMmap_0 := _flags.Bool ("archive-mmap", false, "(memory-mapped archive file)") _archivePreload_0 := _flags.Bool ("archive-preload", false, "(preload archive file)") + _processes_0 := _flags.Uint ("processes", 0, "") + _threads_0 := _flags.Uint ("threads", 0, "") _debug_0 := _flags.Bool ("debug", false, "") _profileCpu_0 := _flags.String ("profile-cpu", "", "") @@ -272,9 +280,11 @@ func main_0 () (error) { _bind = *_bind_0 _archive = *_archive_0 - _archiveMmap = *_archiveMmap_0 _archiveInmem = *_archiveInmem_0 + _archiveMmap = *_archiveMmap_0 _archivePreload = *_archivePreload_0 + _processes = *_processes_0 + _threads = *_threads_0 _debug = *_debug_0 _profileCpu = *_profileCpu_0 @@ -294,11 +304,115 @@ func main_0 () (error) { log.Printf ("[ww] [3e8a40e4] archive 'memory-loaded' implies preloading!\n") _archivePreload = false } + + if (_processes > 1) && ((_profileCpu != "") || (_profileMem != "")) { + AbortError (nil, "[cd18d250] multi-process and profiling are mutually exclusive!") + } + + if _processes < 1 { + _processes = 1 + } + if _threads < 1 { + _threads = 1 + } + } + + + runtime.GOMAXPROCS (int (_threads)) + + + if _processes > 1 { + + log.Printf ("[ii] [06f8c944] sub-processes starting (`%d` processes with `%d` threads each)...\n", _processes, _threads) + + _processesJoin := & sync.WaitGroup {} + + _processesPid := make ([]*os.Process, _processes) + + _processName := os.Args[0] + _processArguments := make ([]string, 0, len (os.Args)) + _processArguments = append (_processArguments, + _processName, + "--bind", _bind, + "--archive", _archive, + ) + if _archiveInmem { + _processArguments = append (_processArguments, "--archive-inmem") + } + if _archiveMmap { + _processArguments = append (_processArguments, "--archive-mmap") + } + if _archivePreload { + _processArguments = append (_processArguments, "--archive-preload") + } + if _debug { + _processArguments = append (_processArguments, "--debug") + } + _processArguments = append (_processArguments, "--threads", fmt.Sprintf ("%d", _threads)) + + _processAttributes := & os.ProcAttr { + Env : []string {}, + Files : []*os.File { + os.Stdin, + os.Stdout, + os.Stderr, + }, + Sys : nil, + } + + for _processIndex, _ := range _processesPid { + if _processPid, _error := os.StartProcess (_processName, _processArguments, _processAttributes); _error == nil { + _processesJoin.Add (1) + _processesPid[_processIndex] = _processPid + log.Printf ("[ii] [63cb22f8] sub-process `%d` started;\n", _processPid.Pid) + go func (_index int, _processPid *os.Process) () { + if _processStatus, _error := _processPid.Wait (); _error == nil { + if _processStatus.Success () { + log.Printf ("[ii] [66b60b81] sub-process `%d` succeeded;\n", _processPid.Pid) + } else { + log.Printf ("[ww] [5d25046b] sub-process `%d` failed: `%s`; ignoring!\n", _processPid.Pid, _processStatus) + } + } else { + LogError (_error, fmt.Sprintf ("[f1bfc927] failed waiting for sub-process `%d`; ignoring!", _processPid.Pid)) + } + _processesPid[_processIndex] = nil + _processesJoin.Done () + } (_processIndex, _processPid) + } else { + LogError (_error, "[8892b34d] failed starting sub-process; ignoring!") + } + } + + { + _signals := make (chan os.Signal, 32) + signal.Notify (_signals, syscall.SIGINT, syscall.SIGTERM) + go func () () { + for { + _signal := <- _signals + log.Printf ("[ii] [a9243ecb] signaling sub-processes...\n") + for _, _processPid := range _processesPid { + if _processPid != nil { + if _error := _processPid.Signal (_signal); _error != nil { + LogError (_error, fmt.Sprintf ("[ab681164] failed signaling sub-process `%d`; ignoring!", _processPid.Pid)) + } + } + } + } + } () + } + + _processesJoin.Wait () + + log.Printf ("[ii] [b949bafc] sub-processes terminated;\n") + + return nil } var _cdbReader *cdb.CDB { + log.Printf ("[ii] [3b788396] opening archive `%s`...\n", _archive) + var _cdbFile *os.File if _cdbFile_0, _error := os.Open (_archive); _error == nil { _cdbFile = _cdbFile_0 @@ -411,7 +525,7 @@ func main_0 () (error) { AbortError (_error, "[907d08b5] failed opening MEM profile!") } _profile := pprof.Lookup ("heap") - defer func () { + defer func () () { runtime.GC () if _profile != nil { if _error := _profile.WriteTo (_stream, 0); _error != nil { @@ -467,8 +581,15 @@ func main_0 () (error) { log.Printf ("[ii] [f11e4e37] listening on `http://%s/`;\n", _bind) - if _error := _httpServer.ListenAndServe (_bind); _error != nil { - AbortError (_error, "[44f45c67] failed starting server!") + var _httpListener net.Listener + if _httpListener_0, _error := reuseport.Listen ("tcp4", _bind); _error == nil { + _httpListener = _httpListener_0 + } else { + AbortError (_error, "[d5f51e9f] failed starting listener!") + } + + if _error := _httpServer.Serve (_httpListener); _error != nil { + AbortError (_error, "[44f45c67] failed executing server!") } diff --git a/sources/lib/common/main.go b/sources/lib/common/main.go index b65b560..c84770d 100644 --- a/sources/lib/common/main.go +++ b/sources/lib/common/main.go @@ -14,6 +14,7 @@ import "os" func Main (_main func () (error)) () { log.SetFlags (0) + log.SetPrefix (fmt.Sprintf ("[%8d] ", os.Getpid ())) if _error := _main (); _error == nil { os.Exit (0)