[server] Add support for multiple processes mode

This commit is contained in:
Ciprian Dorin Craciun 2018-11-13 21:04:42 +02:00
parent 02d05c2aa6
commit 79fd34d30e
2 changed files with 128 additions and 6 deletions

View file

@ -5,13 +5,16 @@ package main
import "bytes"
import "flag"
import "fmt"
import "io"
import "log"
import "net"
import "net/http"
import "os"
import "os/signal"
import "runtime"
import "runtime/pprof"
import "sync"
import "syscall"
import "time"
@ -19,6 +22,7 @@ import "time"
import cdb "github.com/cipriancraciun/go-cdb-lib"
import "github.com/valyala/fasthttp"
import "github.com/valyala/fasthttp/reuseport"
import . "github.com/cipriancraciun/go-cdb-http/lib/common"
import . "github.com/cipriancraciun/go-cdb-http/lib/server"
@ -247,9 +251,11 @@ func main_0 () (error) {
var _bind string
var _archive string
var _archiveMmap bool
var _archiveInmem bool
var _archiveMmap bool
var _archivePreload bool
var _processes uint
var _threads uint
var _debug bool
var _profileCpu string
@ -260,9 +266,11 @@ func main_0 () (error) {
_bind_0 := _flags.String ("bind", "", "<ip>:<port>")
_archive_0 := _flags.String ("archive", "", "<path>")
_archiveMmap_0 := _flags.Bool ("archive-mmap", false, "(memory-mapped archive file)")
_archiveInmem_0 := _flags.Bool ("archive-inmem", false, "(memory-loaded archive file)")
_archiveMmap_0 := _flags.Bool ("archive-mmap", false, "(memory-mapped archive file)")
_archivePreload_0 := _flags.Bool ("archive-preload", false, "(preload archive file)")
_processes_0 := _flags.Uint ("processes", 0, "")
_threads_0 := _flags.Uint ("threads", 0, "")
_debug_0 := _flags.Bool ("debug", false, "")
_profileCpu_0 := _flags.String ("profile-cpu", "", "<path>")
@ -272,9 +280,11 @@ func main_0 () (error) {
_bind = *_bind_0
_archive = *_archive_0
_archiveMmap = *_archiveMmap_0
_archiveInmem = *_archiveInmem_0
_archiveMmap = *_archiveMmap_0
_archivePreload = *_archivePreload_0
_processes = *_processes_0
_threads = *_threads_0
_debug = *_debug_0
_profileCpu = *_profileCpu_0
@ -294,11 +304,115 @@ func main_0 () (error) {
log.Printf ("[ww] [3e8a40e4] archive 'memory-loaded' implies preloading!\n")
_archivePreload = false
}
if (_processes > 1) && ((_profileCpu != "") || (_profileMem != "")) {
AbortError (nil, "[cd18d250] multi-process and profiling are mutually exclusive!")
}
if _processes < 1 {
_processes = 1
}
if _threads < 1 {
_threads = 1
}
}
runtime.GOMAXPROCS (int (_threads))
if _processes > 1 {
log.Printf ("[ii] [06f8c944] sub-processes starting (`%d` processes with `%d` threads each)...\n", _processes, _threads)
_processesJoin := & sync.WaitGroup {}
_processesPid := make ([]*os.Process, _processes)
_processName := os.Args[0]
_processArguments := make ([]string, 0, len (os.Args))
_processArguments = append (_processArguments,
_processName,
"--bind", _bind,
"--archive", _archive,
)
if _archiveInmem {
_processArguments = append (_processArguments, "--archive-inmem")
}
if _archiveMmap {
_processArguments = append (_processArguments, "--archive-mmap")
}
if _archivePreload {
_processArguments = append (_processArguments, "--archive-preload")
}
if _debug {
_processArguments = append (_processArguments, "--debug")
}
_processArguments = append (_processArguments, "--threads", fmt.Sprintf ("%d", _threads))
_processAttributes := & os.ProcAttr {
Env : []string {},
Files : []*os.File {
os.Stdin,
os.Stdout,
os.Stderr,
},
Sys : nil,
}
for _processIndex, _ := range _processesPid {
if _processPid, _error := os.StartProcess (_processName, _processArguments, _processAttributes); _error == nil {
_processesJoin.Add (1)
_processesPid[_processIndex] = _processPid
log.Printf ("[ii] [63cb22f8] sub-process `%d` started;\n", _processPid.Pid)
go func (_index int, _processPid *os.Process) () {
if _processStatus, _error := _processPid.Wait (); _error == nil {
if _processStatus.Success () {
log.Printf ("[ii] [66b60b81] sub-process `%d` succeeded;\n", _processPid.Pid)
} else {
log.Printf ("[ww] [5d25046b] sub-process `%d` failed: `%s`; ignoring!\n", _processPid.Pid, _processStatus)
}
} else {
LogError (_error, fmt.Sprintf ("[f1bfc927] failed waiting for sub-process `%d`; ignoring!", _processPid.Pid))
}
_processesPid[_processIndex] = nil
_processesJoin.Done ()
} (_processIndex, _processPid)
} else {
LogError (_error, "[8892b34d] failed starting sub-process; ignoring!")
}
}
{
_signals := make (chan os.Signal, 32)
signal.Notify (_signals, syscall.SIGINT, syscall.SIGTERM)
go func () () {
for {
_signal := <- _signals
log.Printf ("[ii] [a9243ecb] signaling sub-processes...\n")
for _, _processPid := range _processesPid {
if _processPid != nil {
if _error := _processPid.Signal (_signal); _error != nil {
LogError (_error, fmt.Sprintf ("[ab681164] failed signaling sub-process `%d`; ignoring!", _processPid.Pid))
}
}
}
}
} ()
}
_processesJoin.Wait ()
log.Printf ("[ii] [b949bafc] sub-processes terminated;\n")
return nil
}
var _cdbReader *cdb.CDB
{
log.Printf ("[ii] [3b788396] opening archive `%s`...\n", _archive)
var _cdbFile *os.File
if _cdbFile_0, _error := os.Open (_archive); _error == nil {
_cdbFile = _cdbFile_0
@ -411,7 +525,7 @@ func main_0 () (error) {
AbortError (_error, "[907d08b5] failed opening MEM profile!")
}
_profile := pprof.Lookup ("heap")
defer func () {
defer func () () {
runtime.GC ()
if _profile != nil {
if _error := _profile.WriteTo (_stream, 0); _error != nil {
@ -467,8 +581,15 @@ func main_0 () (error) {
log.Printf ("[ii] [f11e4e37] listening on `http://%s/`;\n", _bind)
if _error := _httpServer.ListenAndServe (_bind); _error != nil {
AbortError (_error, "[44f45c67] failed starting server!")
var _httpListener net.Listener
if _httpListener_0, _error := reuseport.Listen ("tcp4", _bind); _error == nil {
_httpListener = _httpListener_0
} else {
AbortError (_error, "[d5f51e9f] failed starting listener!")
}
if _error := _httpServer.Serve (_httpListener); _error != nil {
AbortError (_error, "[44f45c67] failed executing server!")
}

View file

@ -14,6 +14,7 @@ import "os"
func Main (_main func () (error)) () {
log.SetFlags (0)
log.SetPrefix (fmt.Sprintf ("[%8d] ", os.Getpid ()))
if _error := _main (); _error == nil {
os.Exit (0)