Commits

rmin...@gmail.com  committed e796baa

start.go: beginning of a program to user forth() and replace the various start*go
common.go, cleanup
main.go, call runSlave
master.go, slave.go: make things a bit more resilient: time out instead of stupidly dying.

We're seeing some hangs now but overall the system is enough better that it's in rc.local

  • Participants
  • Parent commits 08dec2b

Comments (0)

Files changed (5)

 	SendPrint(funcname, r, arg)
 	err := r.E.Encode(arg)
 	if err != nil {
-		log.Fatal(funcname, ": Send: ", err)
+		log.Print(funcname, ": Send: ", err)
 	}
 }
 
 	}
 }
 
-func newListenProc(jobname string, job func(c *RpcClientServer), srvaddr string) string {
-	/* it is important to return the listen address, if this function was called
-	 * with port 0
-	 */
-	Dprint(2, "newListenProc %v %v\n", *defaultFam, srvaddr)
-	netl, err := net.Listen(*defaultFam, srvaddr)
-	if err != nil {
-		log.Fatal("newListenProc: ", err)
-	}
-	go func() {
-		for {
-			c, err := netl.Accept()
-			if err != nil {
-				log.Fatal(jobname, ": ", err)
-			}
-			Dprint(2, jobname, ": ", c.RemoteAddr())
-			go job(NewRpcClientServer(c, *binRoot))
-		}
-	}()
-	return netl.Addr().String()
-}
-
 /*
  * This name isn't very good any more.
  * This function builds up a list of files that need to go out to the current node's sub-nodes.
 	ioProxyPort      = flag.String("iopp", "0", "io proxy port")
 	cmdPort          = flag.String("cmdport", "6666", "command port")
 	defaultFam = flag.String("fam", "tcp4", "network type")
+	gprocBin	= flag.String("gprocBin", "gproc", "name of gproc binary")
 	/* required in the command line */
 	parent           = flag.String("parent", "hostname", "parent for some configurations")
 	myAddress = flag.String("myAddress", "hostname", "Required set to my address")
 		if len(flag.Args()) != 1 {
 			flag.Usage()
 		}
-		startSlave()
+		runSlave()
 	case "EXEC", "exec", "e":
 		/* Issuing a command to run on the slaves */
 		if len(flag.Args()) < 3 {
 			Dprint(4, s, " failed")
 			si, ok := slaves.Get(s)
 			if ok {
+				log.Print("Remove slave ", s, " ", si)
 				slaves.Remove(si)
 			} else {
-				Dprint(4, "Could not find slave ", s, " to remove")
+				log.Print("Could not find slave ", s, " to remove")
 			}
 		}
 	}
 	"net"
 	"fmt"
 	"gob"
+	"time"
+	"rand"
 )
 
 var id string
 
-/* We will for now assume that addressing is symmetric, that is, if we Dial someone on
- * a certain address, that's the address they should Dial us on. This assumption has held
- * up well for quite some time. And, in fact, it makes no sense to do it any other way ...
- */
-/* note that we're going to be able to merge master and slave fairly soon, now that they do almost the same things. */
-func startSlave() {
+func runSlave(){
+	/* some simple sanity checking */
+	if *DoPrivateMount == true && os.Getuid() != 0 {
+		log.Fatal("Slave: Need to run as root for private mounts")
+	}
 	if *parent == "" {
 		log.Fatal("Slave: must set parent IP with -parent switch")
 	}
 	if *myId == "" {
 		log.Fatal("Slave: must set myId with -myId switch")
 	}
+
+	/* at this point everything is right. So go forever. */
+	for {
+		startSlave()
+		Dprint(2, "Slave returned; try a timeout")
+		/* sleep for a random time that is between 10 and 60 seconds. random is necessary
+		 * because we have seen self-synchronization in earlier work. 
+		 */
+		r := int64(rand.Intn(50) + 10)
+		err := time.Sleep(r * int64(1<<30))
+		/* if we can't sleep we're going to hammer the server. Bad idea. */
+		if err != nil {
+			log.Fatal("Slave: sleep failed for ", r, " nanoseconds")
+		}
+	}
+}
+
+/* We will for now assume that addressing is symmetric, that is, if we Dial someone on
+ * a certain address, that's the address they should Dial us on. This assumption has held
+ * up well for quite some time. And, in fact, it makes no sense to do it any other way ...
+ */
+/* note that we're going to be able to merge master and slave fairly soon, now that they do almost the same things. */
+func startSlave() {
 	/* slight difference from master: we're ready when we start, since we run things */
 	vitalData := &vitalData{HostReady: true, Id: *myId}
 	masterAddr := *parent + ":" + *cmdPort
-	/* some simple sanity checking */
-	if *DoPrivateMount == true && os.Getuid() != 0 {
-		log.Fatal("Need to run as root for private mounts")
-	}
 	Dprint(2, "dialing masterAddr ", masterAddr)
 	master, err := Dial(*defaultFam, "", masterAddr)
 	if err != nil {
-		log.Fatal("startSlave: dialing:", err)
+		log.Print("startSlave: dialing:", err)
+		return
 	}
 
 	/* vitalData -- what we're doing here is assembling information for our parent. 
 	addr := strings.SplitN(master.LocalAddr().String(), ":", -1)
 	peerAddr := addr[0] + ":0"
 
-	laddr, _ := net.ResolveTCPAddr("tcp4", peerAddr)      // This multiple-return business sometimes gets annoying
-	netl, err := net.ListenTCP(*defaultFam, laddr) // this is how we're ditching newListenProc
+	laddr, _ := net.ResolveTCPAddr("tcp4", peerAddr)
+	netl, err := net.ListenTCP(*defaultFam, laddr)
+	if err != nil {
+		log.Print("startSlave: ", err)
+		return
+	}
 	vitalData.ServerAddr = netl.Addr().String()
 	vitalData.HostAddr = master.LocalAddr().String()
 	vitalData.ParentAddr = master.RemoteAddr().String()
 	r := NewRpcClientServer(master, *binRoot)
 	initSlave(r, vitalData)
 	go registerSlaves()
+	/* wow. This used to be much smaller and needs to be redone. */
 	go func() {
 		for {
 			// Wait for a connection from the master
 			c, err := netl.AcceptTCP()
 			if err != nil {
-				log.Fatal("problem in netl.Accept()")
+				log.Print("problem in netl.Accept()")
+				return
 			}
 			Dprint(3, "Received connection from: ", c.RemoteAddr())
 
 				"R", // "R" = run a program
 			}
 			// Start the new process
-			p, err := os.StartProcess(os.Args[0], argv, &procattr)
+			p, err := os.StartProcess(*gprocBin, argv, &procattr)
 			if err != nil {
 				log.Fatal("startSlave: ", err)
 			} else {
+/*
+ * gproc, a Go reimplementation of the LANL version of bproc and the LANL XCPU software. 
+ * 
+ * This software is released under the GNU Lesser General Public License, version 2, incorporated herein by reference. 
+ *
+ * Copyright (2010) Sandia Corporation. Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation, 
+ * the U.S. Government retains certain rights in this software.
+ */
+/* 
+ * parameters: 
+ * expression to figure your level -- pass to forth
+ * parent. expression to name your parent. 
+ * id. expression to compute the id
+ * myhost. expression to compute host. Default to 'hostname' since the host can work that out. 
+ */
+package main
+
+import (
+	"os"
+	"fmt"
+	"flag"
+)
+
+func usage() {
+	flag.PrintDefaults()
+	os.Exit(2)
+}
+
+var (
+	lowNode      = flag.Int("l", 1, "Lowest node number")
+	highNode     = flag.Int("h", 40, "Highest node number")
+	debugLevel   = flag.Int("d", 0, "Debug level")
+	privateMount = flag.Bool("p", true, "private mounts")
+	runornot = flag.String("b", "20", "block size -- default 20")
+	myAddress = flag.String("myAddress", "hostname", "what to tell a node to compute it's own address") 
+)
+
+func runlevel(lowNode, highNode int, mod bool) {
+	reap := make(chan *os.Waitmsg, 0)
+	numspawn := 0
+	for i := lowNode; i <= highNode; i++ {
+		fmt.Printf("Check %d; mode %v; mod %v\n", i, (i%BLOCKSIZE == 0), mod)
+		if (i%BLOCKSIZE == 0) != mod {
+			continue
+		}
+		numspawn++
+		go func(anode int) {
+			node := fmt.Sprintf("root@kn%d", anode)
+
+                        Args := []string{"ssh", "-o", "StrictHostKeyCHecking=no", node, "./gproc_linux_amd64", 
+				"-parent='hostname base 20 roundup kn strcat 10.1.234.234 hostname base 20 % ifelse'", 
+				"-myId='hostname base 20 % 1  + hostname base 20 / hostname base    %  ifelse'", 
+				"-myAddress=hostname", 
+				fmt.Sprintf("-p=%v ", *privateMount), fmt.Sprintf("-debug=%d", *debugLevel), "s"}
+			f := []*os.File{nil, os.Stdout, os.Stderr}
+			fmt.Printf("Spawn to %v\n", node)
+			pid, err := os.StartProcess("/usr/bin/ssh", Args, &os.ProcAttr{Files: f})
+			if err != nil {
+				fmt.Print("StartProcess fails: ", err)
+			}
+
+			msg, err := os.Wait(pid.Pid, 0)
+			reap <- msg
+		}(i)
+	}
+	for numspawn > 0 {
+		msg := <-reap
+		fmt.Printf("Reaped %v\n", msg)
+		numspawn--
+	}
+}
+func main() {
+	flag.Usage = usage
+	flag.Parse()
+	/* use 1 for the top level. Use anything else for the next level down */
+	level1 := flag.Args()[0] == "1"
+	fmt.Printf("Start nodes %d to %d\n", *lowNode, *highNode)
+	runlevel(*lowNode, *highNode, level1)
+}