Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(37)

Unified Diff: worker/apiuniter/uniter.go

Issue 13648044: api/uniter: ReadSettings to use params.Settings (Closed)
Patch Set: Created 11 years, 7 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « worker/apiuniter/tools_test.go ('k') | worker/apiuniter/uniter_test.go » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
Index: worker/apiuniter/uniter.go
=== added file 'worker/apiuniter/uniter.go'
--- worker/apiuniter/uniter.go 1970-01-01 00:00:00 +0000
+++ worker/apiuniter/uniter.go 2013-09-10 12:21:02 +0000
@@ -0,0 +1,535 @@
+// Copyright 2012, 2013 Canonical Ltd.
+// Licensed under the AGPLv3, see LICENCE file for details.
+
+package apiuniter
+
+import (
+ stderrors "errors"
+ "fmt"
+ "math/rand"
+ "os"
+ "path/filepath"
+ "strings"
+ "time"
+
+ "launchpad.net/loggo"
+ "launchpad.net/tomb"
+
+ "launchpad.net/juju-core/agent/tools"
+ corecharm "launchpad.net/juju-core/charm"
+ "launchpad.net/juju-core/charm/hooks"
+ "launchpad.net/juju-core/cmd"
+ "launchpad.net/juju-core/errors"
+ "launchpad.net/juju-core/state"
+ "launchpad.net/juju-core/state/watcher"
+ "launchpad.net/juju-core/utils"
+ "launchpad.net/juju-core/utils/fslock"
+ "launchpad.net/juju-core/worker/apiuniter/charm"
+ "launchpad.net/juju-core/worker/apiuniter/hook"
+ "launchpad.net/juju-core/worker/apiuniter/jujuc"
+ "launchpad.net/juju-core/worker/apiuniter/relation"
+)
+
+var logger = loggo.GetLogger("juju.worker.uniter")
+
+// Uniter implements the capabilities of the unit agent. It is not intended to
+// implement the actual *behaviour* of the unit agent; that responsibility is
+// delegated to Mode values, which are expected to react to events and direct
+// the uniter's responses to them.
+type Uniter struct {
+ tomb tomb.Tomb
+ st *state.State
+ f *filter
+ unit *state.Unit
+ service *state.Service
+ relationers map[int]*Relationer
+ relationHooks chan hook.Info
+ uuid string
+
+ dataDir string
+ baseDir string
+ toolsDir string
+ relationsDir string
+ charm *charm.GitDir
+ bundles *charm.BundlesDir
+ deployer *charm.Deployer
+ s *State
+ sf *StateFile
+ rand *rand.Rand
+ hookLock *fslock.Lock
+
+ ranConfigChanged bool
+}
+
+// NewUniter creates a new Uniter which will install, run, and upgrade a
+// charm on behalf of the named unit, by executing hooks and operations
+// provoked by changes in st.
+func NewUniter(st *state.State, name string, dataDir string) *Uniter {
+ u := &Uniter{
+ st: st,
+ dataDir: dataDir,
+ }
+ go func() {
+ defer u.tomb.Done()
+ u.tomb.Kill(u.loop(name))
+ }()
+ return u
+}
+
+func (u *Uniter) loop(name string) (err error) {
+ if err = u.init(name); err != nil {
+ return err
+ }
+ logger.Infof("unit %q started", u.unit)
+
+ // Start filtering state change events for consumption by modes.
+ u.f, err = newFilter(u.st, name)
+ if err != nil {
+ return err
+ }
+ defer watcher.Stop(u.f, &u.tomb)
+ go func() {
+ u.tomb.Kill(u.f.Wait())
+ }()
+
+ // Announce our presence to the world.
+ pinger, err := u.unit.SetAgentAlive()
+ if err != nil {
+ return err
+ }
+ defer watcher.Stop(pinger, &u.tomb)
+
+ // Run modes until we encounter an error.
+ mode := ModeInit
+ for err == nil {
+ select {
+ case <-u.tomb.Dying():
+ err = tomb.ErrDying
+ default:
+ mode, err = mode(u)
+ }
+ }
+ logger.Infof("unit %q shutting down: %s", u.unit, err)
+ return err
+}
+
+func (u *Uniter) setupLocks() (err error) {
+ lockDir := filepath.Join(u.dataDir, "locks")
+ u.hookLock, err = fslock.NewLock(lockDir, "uniter-hook-execution")
+ if err != nil {
+ return err
+ }
+ if message := u.hookLock.Message(); u.hookLock.IsLocked() && message != "" {
+ // Look to see if it was us that held the lock before. If it was, we
+ // should be safe enough to break it, as it is likely that we died
+ // before unlocking, and have been restarted by upstart.
+ parts := strings.SplitN(message, ":", 2)
+ if len(parts) > 1 && parts[0] == u.unit.Name() {
+ if err := u.hookLock.BreakLock(); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
+
+func (u *Uniter) init(name string) (err error) {
+ defer utils.ErrorContextf(&err, "failed to initialize uniter for unit %q", name)
+ u.unit, err = u.st.Unit(name)
+ if err != nil {
+ return err
+ }
+ if err = u.setupLocks(); err != nil {
+ return err
+ }
+ ename := u.unit.Tag()
+ u.toolsDir = tools.ToolsDir(u.dataDir, ename)
+ if err := EnsureJujucSymlinks(u.toolsDir); err != nil {
+ return err
+ }
+ u.baseDir = filepath.Join(u.dataDir, "agents", ename)
+ u.relationsDir = filepath.Join(u.baseDir, "state", "relations")
+ if err := os.MkdirAll(u.relationsDir, 0755); err != nil {
+ return err
+ }
+ u.service, err = u.st.Service(u.unit.ServiceName())
+ if err != nil {
+ return err
+ }
+ var env *state.Environment
+ env, err = u.st.Environment()
+ if err != nil {
+ return err
+ }
+ u.uuid = env.UUID()
+ u.relationers = map[int]*Relationer{}
+ u.relationHooks = make(chan hook.Info)
+ u.charm = charm.NewGitDir(filepath.Join(u.baseDir, "charm"))
+ u.bundles = charm.NewBundlesDir(filepath.Join(u.baseDir, "state", "bundles"))
+ u.deployer = charm.NewDeployer(filepath.Join(u.baseDir, "state", "deployer"))
+ u.sf = NewStateFile(filepath.Join(u.baseDir, "state", "uniter"))
+ u.rand = rand.New(rand.NewSource(time.Now().Unix()))
+ return nil
+}
+
+func (u *Uniter) Kill() {
+ u.tomb.Kill(nil)
+}
+
+func (u *Uniter) Wait() error {
+ return u.tomb.Wait()
+}
+
+func (u *Uniter) Stop() error {
+ u.tomb.Kill(nil)
+ return u.Wait()
+}
+
+func (u *Uniter) String() string {
+ return "uniter for " + u.unit.Name()
+}
+
+func (u *Uniter) Dead() <-chan struct{} {
+ return u.tomb.Dead()
+}
+
+// writeState saves uniter state with the supplied values, and infers the appropriate
+// value of Started.
+func (u *Uniter) writeState(op Op, step OpStep, hi *hook.Info, url *corecharm.URL) error {
+ s := State{
+ Started: op == RunHook && hi.Kind == hooks.Start || u.s != nil && u.s.Started,
+ Op: op,
+ OpStep: step,
+ Hook: hi,
+ CharmURL: url,
+ }
+ if err := u.sf.Write(s.Started, s.Op, s.OpStep, s.Hook, s.CharmURL); err != nil {
+ return err
+ }
+ u.s = &s
+ return nil
+}
+
+// deploy deploys the supplied charm URL, and sets follow-up hook operation state
+// as indicated by reason.
+func (u *Uniter) deploy(curl *corecharm.URL, reason Op) error {
+ if reason != Install && reason != Upgrade {
+ panic(fmt.Errorf("%q is not a deploy operation", reason))
+ }
+ var hi *hook.Info
+ if u.s != nil && (u.s.Op == RunHook || u.s.Op == Upgrade) {
+ // If this upgrade interrupts a RunHook, we need to preserve the hook
+ // info so that we can return to the appropriate error state. However,
+ // if we're resuming (or have force-interrupted) an Upgrade, we also
+ // need to preserve whatever hook info was preserved when we initially
+ // started upgrading, to ensure we still return to the correct state.
+ hi = u.s.Hook
+ }
+ if u.s == nil || u.s.OpStep != Done {
+ // Get the new charm bundle before announcing intention to use it.
+ logger.Infof("fetching charm %q", curl)
+ sch, err := u.st.Charm(curl)
+ if err != nil {
+ return err
+ }
+ bun, err := u.bundles.Read(sch, u.tomb.Dying())
+ if err != nil {
+ return err
+ }
+ if err = u.deployer.Stage(bun, curl); err != nil {
+ return err
+ }
+
+ // Set the new charm URL - this returns when the operation is complete,
+ // at which point we can refresh the local copy of the unit to get a
+ // version with the correct charm URL, and can go ahead and deploy
+ // the charm proper.
+ if err := u.f.SetCharm(curl); err != nil {
+ return err
+ }
+ if err := u.unit.Refresh(); err != nil {
+ return err
+ }
+ logger.Infof("deploying charm %q", curl)
+ if err = u.writeState(reason, Pending, hi, curl); err != nil {
+ return err
+ }
+ if err = u.deployer.Deploy(u.charm); err != nil {
+ return err
+ }
+ if err = u.writeState(reason, Done, hi, curl); err != nil {
+ return err
+ }
+ }
+ logger.Infof("charm %q is deployed", curl)
+ status := Queued
+ if hi != nil {
+ // If a hook operation was interrupted, restore it.
+ status = Pending
+ } else {
+ // Otherwise, queue the relevant post-deploy hook.
+ hi = &hook.Info{}
+ switch reason {
+ case Install:
+ hi.Kind = hooks.Install
+ case Upgrade:
+ hi.Kind = hooks.UpgradeCharm
+ }
+ }
+ return u.writeState(RunHook, status, hi, nil)
+}
+
+// errHookFailed indicates that a hook failed to execute, but that the Uniter's
+// operation is not affected by the error.
+var errHookFailed = stderrors.New("hook execution failed")
+
+// runHook executes the supplied hook.Info in an appropriate hook context. If
+// the hook itself fails to execute, it returns errHookFailed.
+func (u *Uniter) runHook(hi hook.Info) (err error) {
+ // Prepare context.
+ if err = hi.Validate(); err != nil {
+ return err
+ }
+
+ hookName := string(hi.Kind)
+ relationId := -1
+ if hi.Kind.IsRelation() {
+ relationId = hi.RelationId
+ if hookName, err = u.relationers[relationId].PrepareHook(hi); err != nil {
+ return err
+ }
+ }
+ hctxId := fmt.Sprintf("%s:%s:%d", u.unit.Name(), hookName, u.rand.Int63())
+ // We want to make sure we don't block forever when locking, but take the
+ // tomb into account.
+ checkTomb := func() error {
+ select {
+ case <-u.tomb.Dying():
+ return tomb.ErrDying
+ default:
+ // no-op to fall through to return.
+ }
+ return nil
+ }
+ lockMessage := fmt.Sprintf("%s: running hook %q", u.unit.Name(), hookName)
+ if err = u.hookLock.LockWithFunc(lockMessage, checkTomb); err != nil {
+ return err
+ }
+ defer u.hookLock.Unlock()
+
+ ctxRelations := map[int]*ContextRelation{}
+ for id, r := range u.relationers {
+ ctxRelations[id] = r.Context()
+ }
+ apiAddrs, err := u.st.APIAddresses()
+ if err != nil {
+ return err
+ }
+ hctx := NewHookContext(u.unit, hctxId, u.uuid, relationId, hi.RemoteUnit,
+ ctxRelations, apiAddrs)
+
+ // Prepare server.
+ getCmd := func(ctxId, cmdName string) (cmd.Command, error) {
+ // TODO: switch to long-running server with single context;
+ // use nonce in place of context id.
+ if ctxId != hctxId {
+ return nil, fmt.Errorf("expected context id %q, got %q", hctxId, ctxId)
+ }
+ return jujuc.NewCommand(hctx, cmdName)
+ }
+ socketPath := filepath.Join(u.baseDir, "agent.socket")
+ // Use abstract namespace so we don't get stale socket files.
+ socketPath = "@" + socketPath
+ srv, err := jujuc.NewServer(getCmd, socketPath)
+ if err != nil {
+ return err
+ }
+ go srv.Run()
+ defer srv.Close()
+
+ // Run the hook.
+ if err := u.writeState(RunHook, Pending, &hi, nil); err != nil {
+ return err
+ }
+ logger.Infof("running %q hook", hookName)
+ if err := hctx.RunHook(hookName, u.charm.Path(), u.toolsDir, socketPath); err != nil {
+ logger.Errorf("hook failed: %s", err)
+ return errHookFailed
+ }
+ if err := u.writeState(RunHook, Done, &hi, nil); err != nil {
+ return err
+ }
+ logger.Infof("ran %q hook", hookName)
+ return u.commitHook(hi)
+}
+
+// commitHook ensures that state is consistent with the supplied hook, and
+// that the fact of the hook's completion is persisted.
+func (u *Uniter) commitHook(hi hook.Info) error {
+ logger.Infof("committing %q hook", hi.Kind)
+ if hi.Kind.IsRelation() {
+ if err := u.relationers[hi.RelationId].CommitHook(hi); err != nil {
+ return err
+ }
+ if hi.Kind == hooks.RelationBroken {
+ delete(u.relationers, hi.RelationId)
+ }
+ }
+ if err := u.charm.Snapshotf("Completed %q hook.", hi.Kind); err != nil {
+ return err
+ }
+ if hi.Kind == hooks.ConfigChanged {
+ u.ranConfigChanged = true
+ }
+ if err := u.writeState(Continue, Pending, &hi, nil); err != nil {
+ return err
+ }
+ logger.Infof("committed %q hook", hi.Kind)
+ return nil
+}
+
+// restoreRelations reconciles the supplied relation state dirs with the
+// remote state of the corresponding relations.
+func (u *Uniter) restoreRelations() error {
+ // TODO(dimitern): Get these from state, not from disk.
+ dirs, err := relation.ReadAllStateDirs(u.relationsDir)
+ if err != nil {
+ return err
+ }
+ for id, dir := range dirs {
+ remove := false
+ rel, err := u.st.Relation(id)
+ if errors.IsNotFoundError(err) {
+ remove = true
+ } else if err != nil {
+ return err
+ }
+ if err = u.addRelation(rel, dir); err == state.ErrCannotEnterScope {
+ remove = true
+ } else if err != nil {
+ return err
+ }
+ if remove {
+ // If the previous execution was interrupted in the process of
+ // joining or departing the relation, the directory will be empty
+ // and the state is sane.
+ if err := dir.Remove(); err != nil {
+ return fmt.Errorf("cannot synchronize relation state: %v", err)
+ }
+ }
+ }
+ return nil
+}
+
+// updateRelations responds to changes in the life states of the relations
+// with the supplied ids. If any id corresponds to an alive relation not
+// known to the unit, the uniter will join that relation and return its
+// relationer in the added list.
+func (u *Uniter) updateRelations(ids []int) (added []*Relationer, err error) {
+ for _, id := range ids {
+ if r, found := u.relationers[id]; found {
+ rel := r.ru.Relation()
+ if err := rel.Refresh(); err != nil {
+ return nil, fmt.Errorf("cannot update relation %q: %v", rel, err)
+ }
+ if rel.Life() == state.Dying {
+ if err := r.SetDying(); err != nil {
+ return nil, err
+ } else if r.IsImplicit() {
+ delete(u.relationers, id)
+ }
+ }
+ continue
+ }
+ // Relations that are not alive are simply skipped, because they
+ // were not previously known anyway.
+ rel, err := u.st.Relation(id)
+ if err != nil {
+ if errors.IsNotFoundError(err) {
+ continue
+ }
+ return nil, err
+ }
+ if rel.Life() != state.Alive {
+ continue
+ }
+ // Make sure we ignore relations not implemented by the unit's charm
+ ch, err := corecharm.ReadDir(u.charm.Path())
+ if err != nil {
+ return nil, err
+ }
+ if ep, err := rel.Endpoint(u.unit.ServiceName()); err != nil {
+ return nil, err
+ } else if !ep.ImplementedBy(ch) {
+ logger.Warningf("skipping relation with unknown endpoint %q", ep)
+ continue
+ }
+ dir, err := relation.ReadStateDir(u.relationsDir, id)
+ if err != nil {
+ return nil, err
+ }
+ err = u.addRelation(rel, dir)
+ if err == nil {
+ added = append(added, u.relationers[id])
+ continue
+ }
+ e := dir.Remove()
+ if err != state.ErrCannotEnterScope {
+ return nil, err
+ }
+ if e != nil {
+ return nil, e
+ }
+ }
+ if u.unit.IsPrincipal() {
+ return added, nil
+ }
+ // If no Alive relations remain between a subordinate unit's service
+ // and its principal's service, the subordinate must become Dying.
+ keepAlive := false
+ for _, r := range u.relationers {
+ scope := r.ru.Endpoint().Scope
+ if scope == corecharm.ScopeContainer && !r.dying {
+ keepAlive = true
+ break
+ }
+ }
+ if !keepAlive {
+ if err := u.unit.Destroy(); err != nil {
+ return nil, err
+ }
+ }
+ return added, nil
+}
+
+// addRelation causes the unit agent to join the supplied relation, and to
+// store persistent state in the supplied dir.
+func (u *Uniter) addRelation(rel *state.Relation, dir *relation.StateDir) error {
+ logger.Infof("joining relation %q", rel)
+ ru, err := rel.Unit(u.unit)
+ if err != nil {
+ return err
+ }
+ r := NewRelationer(ru, dir, u.relationHooks)
+ w := u.unit.Watch()
+ defer watcher.Stop(w, &u.tomb)
+ for {
+ select {
+ case <-u.tomb.Dying():
+ return tomb.ErrDying
+ case _, ok := <-w.Changes():
+ if !ok {
+ return watcher.MustErr(w)
+ }
+ if err := r.Join(); err == state.ErrCannotEnterScopeYet {
+ logger.Infof("cannot enter scope for relation %q; waiting for subordinate to be removed", rel)
+ continue
+ } else if err != nil {
+ return err
+ }
+ logger.Infof("joined relation %q", rel)
+ u.relationers[rel.Id()] = r
+ return nil
+ }
+ }
+}
« no previous file with comments | « worker/apiuniter/tools_test.go ('k') | worker/apiuniter/uniter_test.go » ('j') | no next file with comments »

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b