Process Management¶
Deep dive into how Databricks DevBox manages code-server processes throughout their lifecycle.
Process Lifecycle¶
stateDiagram-v2
[*] --> Created: CreateServer()
Created --> ExtensionInstallation: installExtensions()
ExtensionInstallation --> Stopped: Installation complete
Stopped --> Starting: StartServer()
Starting --> Running: Process spawned
Running --> Monitoring: Health checks begin
Monitoring --> Running: Healthy
Monitoring --> Stopped: Health check failed
Running --> Stopping: StopServer()
Stopping --> Stopped: Process terminated
Stopped --> Starting: RestartServer()
Running --> [*]: DeleteServer()
Stopped --> [*]: DeleteServer()
Server Creation Flow¶
1. Metadata Creation¶
// process_manager.go:131
func (pm *ProcessManager) CreateServer(name, workspacePath string, extensions []string, zipFile, githubURL string) (*ServerInstance, error) {
// Generate unique ID
id := uuid.New().String()
// Assign available port
port := pm.getNextAvailablePort() // Starts from 8500
// Create workspace directory
if workspacePath == "" {
workspacePath = filepath.Join("workspace", id)
}
absWorkspacePath, _ := filepath.Abs(workspacePath)
os.MkdirAll(absWorkspacePath, 0755)
// Create server data directory
serverDataDir := filepath.Join(pm.dataDir, id) // data/<server-id>
os.MkdirAll(serverDataDir, 0755)
// Create server instance
server := &ServerInstance{
ID: id,
Name: name,
Port: port,
WorkspacePath: absWorkspacePath,
Extensions: extensions,
Status: StatusStopped, // Not started yet
}
// Store in memory and persist
pm.servers[id] = server
pm.portMap[port] = id
pm.saveServers()
return server, nil
}
2. Workspace Initialization¶
From ZIP file:
// process_manager.go:693
func (pm *ProcessManager) extractZipFile(zipPath, targetPath string) error {
reader, _ := zip.OpenReader(zipPath)
defer reader.Close()
for _, file := range reader.File {
// Sanitize path (prevent directory traversal)
if strings.Contains(file.Name, "..") {
continue
}
filePath := filepath.Join(targetPath, file.Name)
if file.FileInfo().IsDir() {
os.MkdirAll(filePath, file.FileInfo().Mode())
} else {
// Extract file
fileReader, _ := file.Open()
targetFile, _ := os.Create(filePath)
io.Copy(targetFile, fileReader)
targetFile.Close()
fileReader.Close()
}
}
return nil
}
From GitHub:
// process_manager.go:739
func (pm *ProcessManager) cloneGithubRepo(repoURL, targetPath string) error {
cmd := exec.Command("git", "clone", repoURL, targetPath)
return cmd.Run()
}
3. Extension Installation¶
Synchronous installation (blocks server creation until complete):
// process_manager.go:777
func (pm *ProcessManager) installExtension(env []string, extensionID, serverID, serverName string) bool {
cmd := exec.Command("code-server", "--install-extension", extensionID)
cmd.Env = env // Must include XDG_DATA_HOME
stdout, err := cmd.Output()
if err != nil {
pm.logger.LogProcessEvent(serverID, serverName, "EXTENSION_INSTALL_FAILED", err.Error())
return false
}
pm.logger.LogProcessEvent(serverID, serverName, "EXTENSION_INSTALLED", extensionID)
return true
}
Environment for extension installation:
env := os.Environ()
userDataDir := filepath.Join(pm.dataDir, serverID) // data/<server-id>
absDataDir, _ := filepath.Abs(userDataDir)
env = append(env,
fmt.Sprintf("XDG_DATA_HOME=%s", absDataDir), // Tells code-server where to install
)
User settings application:
After extensions are installed, apply user settings from devbox.yaml:
// process_manager.go:1287
func (pm *ProcessManager) applyUserSettings(serverID string, installedExtensions []string) error {
config := GetConfig()
// Collect user settings from extension groups
userSettings := make(map[string]interface{})
for groupName, group := range config.ExtensionGroups {
if group.UserSettings != nil {
// Merge settings
for key, value := range group.UserSettings {
userSettings[key] = value
}
}
}
// Write to settings.json
settingsFile := filepath.Join(pm.dataDir, serverID, "code-server", "User", "settings.json")
data, _ := json.MarshalIndent(userSettings, "", " ")
os.WriteFile(settingsFile, data, 0644)
return nil
}
Starting a Server¶
0. Port Cleanup (Pre-start)¶
Before starting a server, the system automatically kills any existing process on the target port:
// process_manager.go:131
func (pm *ProcessManager) killProcessOnPort(port int) error {
// Use lsof to find the process using the port
cmd := exec.Command("lsof", "-ti", fmt.Sprintf(":%d", port))
output, err := cmd.Output()
if err != nil {
// No process found on port (which is fine)
return nil
}
// Parse PIDs from output (one per line)
pidStr := strings.TrimSpace(string(output))
if pidStr == "" {
return nil
}
// Split by newlines in case multiple processes are on the port
pids := strings.Split(pidStr, "\n")
for _, pidLine := range pids {
pidLine = strings.TrimSpace(pidLine)
if pidLine == "" {
continue
}
// Kill the process with SIGKILL
killCmd := exec.Command("kill", "-9", pidLine)
if err := killCmd.Run(); err != nil {
log.Printf("Failed to kill process %s on port %d: %v", pidLine, port, err)
} else {
log.Printf("Killed existing process %s on port %d", pidLine, port)
}
}
return nil
}
When this happens:
- User clicks the play button to start a code-server
- System checks if any process is listening on the assigned port
- If a process exists, it's forcefully terminated with
kill -9 - If no process exists, the system continues normally
- The code-server then starts on the now-free port
Use cases:
- Stale processes: A previous code-server instance didn't shut down cleanly
- Port conflicts: Another application is using the port
- Development: You manually started a process on the port for testing
1. Build Command and Environment¶
// process_manager.go:239
func (pm *ProcessManager) StartServer(id string) error {
server := pm.servers[id]
// Kill any existing process on the port before starting
if err := pm.killProcessOnPort(server.Port); err != nil {
log.Printf("Warning: Failed to kill existing process on port %d: %v", server.Port, err)
// Continue anyway - the port might just be free
}
// Create config directory
userDataDir := filepath.Join(pm.dataDir, id)
configDir := filepath.Join(userDataDir, "code-server")
os.MkdirAll(configDir, 0755)
// Build command
args := []string{
"--bind-addr", fmt.Sprintf("0.0.0.0:%d", server.Port),
"--user-data-dir", configDir,
"--auth", "none",
"--disable-telemetry",
"--disable-update-check",
"--disable-file-downloads",
"--log", "info",
server.WorkspacePath,
}
cmd := exec.Command("code-server", args...)
cmd.Dir = server.WorkspacePath
// Set environment
env := os.Environ()
absDataDir, _ := filepath.Abs(userDataDir)
env = append(env,
fmt.Sprintf("XDG_DATA_HOME=%s", absDataDir),
"NODE_OPTIONS=--max-old-space-size=2048",
"VSCODE_LOGS=info",
"CODE_SERVER_LOG=info",
"UV_THREADPOOL_SIZE=128",
"NODE_TLS_REJECT_UNAUTHORIZED=0",
"VSCODE_DISABLE_CRASH_REPORTER=true",
"ELECTRON_NO_ATTACH_CONSOLE=1",
"DISABLE_TELEMETRY=true",
)
cmd.Env = env
// Start process
return cmd.Start()
}
2. Output Capture¶
// Get stdout/stderr pipes
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
// Start output capture in goroutine
outputCapture := NewEnhancedProcessOutputCapture(pm.logger, pm.logManager, id, serverName)
go outputCapture.CaptureOutput(stdout, stderr)
3. Update Server State¶
// Update metadata
now := time.Now()
server.PID = &cmd.Process.Pid
server.StartTime = &now
server.Status = StatusRunning
server.Command = append([]string{"code-server"}, args...)
// Persist to disk
pm.saveServers()
// Start monitoring
go pm.monitorProcess(id, cmd)
Process Monitoring¶
Per-Process Monitor¶
Each running server has a dedicated goroutine:
// process_manager.go:516
func (pm *ProcessManager) monitorProcess(id string, cmd *exec.Cmd) {
// Wait for process to exit
err := cmd.Wait()
pm.mutex.Lock()
defer pm.mutex.Unlock()
server := pm.servers[id]
if err != nil {
log.Printf("Server %s exited with error: %v", server.Name, err)
pm.logger.LogProcessEvent(id, server.Name, "PROCESS_EXITED_ERROR", err.Error())
server.Status = StatusFailed
} else {
log.Printf("Server %s exited normally", server.Name)
pm.logger.LogProcessEvent(id, server.Name, "PROCESS_EXITED", "Normal exit")
server.Status = StatusStopped
}
server.PID = nil
server.StartTime = nil
pm.saveServers()
}
Global Health Monitor¶
Single goroutine monitors all servers every 30 seconds:
// process_manager.go:884
func (pm *ProcessManager) startHealthMonitor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
pm.performHealthCheck()
}
}
func (pm *ProcessManager) performHealthCheck() {
pm.mutex.Lock()
defer pm.mutex.Unlock()
for serverID, server := range pm.servers {
if server.Status == StatusRunning && server.PID != nil {
// Check HTTP health endpoint
isHealthy := pm.isServerHealthy(server.Port)
if !isHealthy {
log.Printf("Health check failed for server %s", server.Name)
server.Status = StatusStopped
server.PID = nil
server.StartTime = nil
pm.saveServers()
}
}
}
}
Health Check Implementation¶
// process_manager.go:951
func (pm *ProcessManager) isServerHealthy(port int) bool {
client := &http.Client{
Timeout: 3 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse // Don't follow redirects
},
}
// Wake up server with root request (ignore result)
rootResp, _ := client.Get(fmt.Sprintf("http://localhost:%d/", port))
if rootResp != nil {
rootResp.Body.Close()
}
// Check health endpoint
resp, err := client.Get(fmt.Sprintf("http://localhost:%d/healthz", port))
if err != nil {
return false
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return false
}
// Parse response
var healthResponse struct {
Status string `json:"status"`
LastHeartbeat int64 `json:"lastHeartbeat"`
}
json.NewDecoder(resp.Body).Decode(&healthResponse)
return healthResponse.Status == "alive"
}
Metrics Collection¶
Every second, update metrics for running servers:
// process_manager.go:1000
func (pm *ProcessManager) updateServerMetrics() {
now := time.Now()
for _, server := range pm.servers {
if server.Status != StatusRunning || server.PID == nil {
// Clear metrics for non-running servers
server.Uptime = nil
server.CPUPercent = nil
server.MemoryMB = nil
continue
}
// Get process handle
proc, err := process.NewProcess(int32(*server.PID))
if err != nil {
// Process doesn't exist
server.Status = StatusStopped
server.PID = nil
continue
}
// Check if process exists
exists, _ := proc.IsRunning()
if !exists {
server.Status = StatusStopped
server.PID = nil
continue
}
// Calculate uptime
uptime := now.Sub(*server.StartTime).Seconds()
server.Uptime = &uptime
// Get CPU usage
cpuPercent, _ := proc.CPUPercent()
server.CPUPercent = &cpuPercent
// Get memory usage
memInfo, _ := proc.MemoryInfo()
memoryMB := float64(memInfo.RSS) / 1024 / 1024
server.MemoryMB = &memoryMB
// Update timestamp
server.LastUpdate = &now
}
}
Stopping a Server¶
Graceful Shutdown¶
// process_manager.go:357
func (pm *ProcessManager) StopServer(id string) error {
server := pm.servers[id]
if server.PID == nil {
return fmt.Errorf("server not running")
}
// Find process
proc, err := os.FindProcess(*server.PID)
if err != nil {
return err
}
// Send SIGTERM for graceful shutdown
if err := proc.Signal(syscall.SIGTERM); err == nil {
// Wait up to 10 seconds
go func() {
time.Sleep(10 * time.Second)
pm.mutex.Lock()
defer pm.mutex.Unlock()
// Force kill if still running
if server.PID != nil {
if p, err := os.FindProcess(*server.PID); err == nil {
p.Kill()
}
}
server.Status = StatusStopped
server.PID = nil
pm.saveServers()
}()
} else {
// SIGTERM failed, force kill
proc.Kill()
}
// Immediately mark as stopped
server.Status = StatusStopped
server.PID = nil
server.StartTime = nil
pm.saveServers()
return nil
}
Deleting a Server¶
Complete Cleanup¶
// process_manager.go:410
func (pm *ProcessManager) DeleteServer(id string) error {
server := pm.servers[id]
// Stop if running
if server.Status == StatusRunning && server.PID != nil {
proc, _ := os.FindProcess(*server.PID)
proc.Kill()
}
// Clean up directories
dataDir := filepath.Join(pm.dataDir, id)
os.RemoveAll(dataDir) // Remove data/<server-id>
os.RemoveAll(server.WorkspacePath) // Remove workspace
logsDir := filepath.Join("logs", id)
os.RemoveAll(logsDir) // Remove logs/<server-id>
// Remove from maps
delete(pm.portMap, server.Port)
delete(pm.servers, id)
// Persist
pm.saveServers()
return nil
}
State Persistence¶
Save State¶
// process_manager.go:679
func (pm *ProcessManager) saveServers() {
data, _ := json.MarshalIndent(pm.servers, "", " ")
os.WriteFile(pm.serversFile, data, 0644) // data/servers.json
}
Load State¶
// process_manager.go:652
func (pm *ProcessManager) loadServersFromFile() {
data, _ := os.ReadFile(pm.serversFile)
var servers map[string]*ServerInstance
json.Unmarshal(data, &servers)
// Restore in-memory state
pm.servers = servers
pm.portMap = make(map[int]string)
// Rebuild port map
for id, server := range servers {
pm.portMap[server.Port] = id
if server.Port >= pm.nextPort {
pm.nextPort = server.Port + 1
}
}
}
State Refresh¶
Every second, reload state from disk to catch external changes:
// process_manager.go:821
func (pm *ProcessManager) startStateRefreshRoutine() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
pm.refreshStateFromFile()
}
}
Concurrency & Thread Safety¶
Locking Strategy¶
// Read operations
pm.mutex.RLock()
server := pm.servers[id]
pm.mutex.RUnlock()
// Write operations
pm.mutex.Lock()
pm.servers[id] = newServer
pm.saveServers()
pm.mutex.Unlock()
Goroutine Management¶
- 1 global health monitor (all servers)
- 1 global state refresher (all servers)
- N process monitors (1 per running server)
- N log capture goroutines (1 per running server)
Next Steps¶
-
Frontend architecture
-
Configuration reference