Commits

Anonymous committed e796baa

start.go: beginning of a program to user forth() and replace the various start*go
common.go, cleanup
main.go, call runSlave
master.go, slave.go: make things a bit more resilient: time out instead of stupidly dying.

We're seeing some hangs now but overall the system is enough better that it's in rc.local

Comments (0)

Files changed (5)

 	SendPrint(funcname, r, arg)
 	err := r.E.Encode(arg)
 	if err != nil {
-		log.Fatal(funcname, ": Send: ", err)
+		log.Print(funcname, ": Send: ", err)
 	}
 }
 
 	}
 }
 
-func newListenProc(jobname string, job func(c *RpcClientServer), srvaddr string) string {
-	/* it is important to return the listen address, if this function was called
-	 * with port 0
-	 */
-	Dprint(2, "newListenProc %v %v\n", *defaultFam, srvaddr)
-	netl, err := net.Listen(*defaultFam, srvaddr)
-	if err != nil {
-		log.Fatal("newListenProc: ", err)
-	}
-	go func() {
-		for {
-			c, err := netl.Accept()
-			if err != nil {
-				log.Fatal(jobname, ": ", err)
-			}
-			Dprint(2, jobname, ": ", c.RemoteAddr())
-			go job(NewRpcClientServer(c, *binRoot))
-		}
-	}()
-	return netl.Addr().String()
-}
-
 /*
  * This name isn't very good any more.
  * This function builds up a list of files that need to go out to the current node's sub-nodes.
 	ioProxyPort      = flag.String("iopp", "0", "io proxy port")
 	cmdPort          = flag.String("cmdport", "6666", "command port")
 	defaultFam = flag.String("fam", "tcp4", "network type")
+	gprocBin	= flag.String("gprocBin", "gproc", "name of gproc binary")
 	/* required in the command line */
 	parent           = flag.String("parent", "hostname", "parent for some configurations")
 	myAddress = flag.String("myAddress", "hostname", "Required set to my address")
 		if len(flag.Args()) != 1 {
 			flag.Usage()
 		}
-		startSlave()
+		runSlave()
 	case "EXEC", "exec", "e":
 		/* Issuing a command to run on the slaves */
 		if len(flag.Args()) < 3 {
 			Dprint(4, s, " failed")
 			si, ok := slaves.Get(s)
 			if ok {
+				log.Print("Remove slave ", s, " ", si)
 				slaves.Remove(si)
 			} else {
-				Dprint(4, "Could not find slave ", s, " to remove")
+				log.Print("Could not find slave ", s, " to remove")
 			}
 		}
 	}
 	"net"
 	"fmt"
 	"gob"
+	"time"
+	"rand"
 )
 
 var id string
 
-/* We will for now assume that addressing is symmetric, that is, if we Dial someone on
- * a certain address, that's the address they should Dial us on. This assumption has held
- * up well for quite some time. And, in fact, it makes no sense to do it any other way ...
- */
-/* note that we're going to be able to merge master and slave fairly soon, now that they do almost the same things. */
-func startSlave() {
+func runSlave(){
+	/* some simple sanity checking */
+	if *DoPrivateMount == true && os.Getuid() != 0 {
+		log.Fatal("Slave: Need to run as root for private mounts")
+	}
 	if *parent == "" {
 		log.Fatal("Slave: must set parent IP with -parent switch")
 	}
 	if *myId == "" {
 		log.Fatal("Slave: must set myId with -myId switch")
 	}
+
+	/* at this point everything is right. So go forever. */
+	for {
+		startSlave()
+		Dprint(2, "Slave returned; try a timeout")
+		/* sleep for a random time that is between 10 and 60 seconds. random is necessary
+		 * because we have seen self-synchronization in earlier work. 
+		 */
+		r := int64(rand.Intn(50) + 10)
+		err := time.Sleep(r * int64(1<<30))
+		/* if we can't sleep we're going to hammer the server. Bad idea. */
+		if err != nil {
+			log.Fatal("Slave: sleep failed for ", r, " nanoseconds")
+		}
+	}
+}
+
+/* We will for now assume that addressing is symmetric, that is, if we Dial someone on
+ * a certain address, that's the address they should Dial us on. This assumption has held
+ * up well for quite some time. And, in fact, it makes no sense to do it any other way ...
+ */
+/* note that we're going to be able to merge master and slave fairly soon, now that they do almost the same things. */
+func startSlave() {
 	/* slight difference from master: we're ready when we start, since we run things */
 	vitalData := &vitalData{HostReady: true, Id: *myId}
 	masterAddr := *parent + ":" + *cmdPort
-	/* some simple sanity checking */
-	if *DoPrivateMount == true && os.Getuid() != 0 {
-		log.Fatal("Need to run as root for private mounts")
-	}
 	Dprint(2, "dialing masterAddr ", masterAddr)
 	master, err := Dial(*defaultFam, "", masterAddr)
 	if err != nil {
-		log.Fatal("startSlave: dialing:", err)
+		log.Print("startSlave: dialing:", err)
+		return
 	}
 
 	/* vitalData -- what we're doing here is assembling information for our parent. 
 	addr := strings.SplitN(master.LocalAddr().String(), ":", -1)
 	peerAddr := addr[0] + ":0"
 
-	laddr, _ := net.ResolveTCPAddr("tcp4", peerAddr)      // This multiple-return business sometimes gets annoying
-	netl, err := net.ListenTCP(*defaultFam, laddr) // this is how we're ditching newListenProc
+	laddr, _ := net.ResolveTCPAddr("tcp4", peerAddr)
+	netl, err := net.ListenTCP(*defaultFam, laddr)
+	if err != nil {
+		log.Print("startSlave: ", err)
+		return
+	}
 	vitalData.ServerAddr = netl.Addr().String()
 	vitalData.HostAddr = master.LocalAddr().String()
 	vitalData.ParentAddr = master.RemoteAddr().String()
 	r := NewRpcClientServer(master, *binRoot)
 	initSlave(r, vitalData)
 	go registerSlaves()
+	/* wow. This used to be much smaller and needs to be redone. */
 	go func() {
 		for {
 			// Wait for a connection from the master
 			c, err := netl.AcceptTCP()
 			if err != nil {
-				log.Fatal("problem in netl.Accept()")
+				log.Print("problem in netl.Accept()")
+				return
 			}
 			Dprint(3, "Received connection from: ", c.RemoteAddr())
 
 				"R", // "R" = run a program
 			}
 			// Start the new process
-			p, err := os.StartProcess(os.Args[0], argv, &procattr)
+			p, err := os.StartProcess(*gprocBin, argv, &procattr)
 			if err != nil {
 				log.Fatal("startSlave: ", err)
 			} else {
+/*
+ * gproc, a Go reimplementation of the LANL version of bproc and the LANL XCPU software. 
+ * 
+ * This software is released under the GNU Lesser General Public License, version 2, incorporated herein by reference. 
+ *
+ * Copyright (2010) Sandia Corporation. Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation, 
+ * the U.S. Government retains certain rights in this software.
+ */
+/* 
+ * parameters: 
+ * expression to figure your level -- pass to forth
+ * parent. expression to name your parent. 
+ * id. expression to compute the id
+ * myhost. expression to compute host. Default to 'hostname' since the host can work that out. 
+ */
+package main
+
+import (
+	"os"
+	"fmt"
+	"flag"
+)
+
+func usage() {
+	flag.PrintDefaults()
+	os.Exit(2)
+}
+
+var (
+	lowNode      = flag.Int("l", 1, "Lowest node number")
+	highNode     = flag.Int("h", 40, "Highest node number")
+	debugLevel   = flag.Int("d", 0, "Debug level")
+	privateMount = flag.Bool("p", true, "private mounts")
+	runornot = flag.String("b", "20", "block size -- default 20")
+	myAddress = flag.String("myAddress", "hostname", "what to tell a node to compute it's own address") 
+)
+
+func runlevel(lowNode, highNode int, mod bool) {
+	reap := make(chan *os.Waitmsg, 0)
+	numspawn := 0
+	for i := lowNode; i <= highNode; i++ {
+		fmt.Printf("Check %d; mode %v; mod %v\n", i, (i%BLOCKSIZE == 0), mod)
+		if (i%BLOCKSIZE == 0) != mod {
+			continue
+		}
+		numspawn++
+		go func(anode int) {
+			node := fmt.Sprintf("root@kn%d", anode)
+
+                        Args := []string{"ssh", "-o", "StrictHostKeyCHecking=no", node, "./gproc_linux_amd64", 
+				"-parent='hostname base 20 roundup kn strcat 10.1.234.234 hostname base 20 % ifelse'", 
+				"-myId='hostname base 20 % 1  + hostname base 20 / hostname base    %  ifelse'", 
+				"-myAddress=hostname", 
+				fmt.Sprintf("-p=%v ", *privateMount), fmt.Sprintf("-debug=%d", *debugLevel), "s"}
+			f := []*os.File{nil, os.Stdout, os.Stderr}
+			fmt.Printf("Spawn to %v\n", node)
+			pid, err := os.StartProcess("/usr/bin/ssh", Args, &os.ProcAttr{Files: f})
+			if err != nil {
+				fmt.Print("StartProcess fails: ", err)
+			}
+
+			msg, err := os.Wait(pid.Pid, 0)
+			reap <- msg
+		}(i)
+	}
+	for numspawn > 0 {
+		msg := <-reap
+		fmt.Printf("Reaped %v\n", msg)
+		numspawn--
+	}
+}
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+	/* use 1 for the top level. Use anything else for the next level down */
+	level1 := flag.Args()[0] == "1"
+	fmt.Printf("Start nodes %d to %d\n", *lowNode, *highNode)
+	runlevel(*lowNode, *highNode, level1)
+}