Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1257)

Delta Between Two Patch Sets: state/unit.go

Issue 12218043: state: Use machine addresses from unit
Left Patch Set: Created 11 years, 8 months ago
Right Patch Set: state: Use machine addresses from unit Created 11 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « [revision details] ('k') | state/unit_test.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 // Copyright 2012, 2013 Canonical Ltd.
2 // Licensed under the AGPLv3, see LICENCE file for details.
3
4 package state
5
6 import (
7 stderrors "errors"
8 "fmt"
9 "sort"
10 "time"
11
12 "labix.org/v2/mgo"
13 "labix.org/v2/mgo/bson"
14 "labix.org/v2/mgo/txn"
15
16 "launchpad.net/loggo"
17
18 "launchpad.net/juju-core/agent/tools"
19 "launchpad.net/juju-core/charm"
20 "launchpad.net/juju-core/constraints"
21 "launchpad.net/juju-core/errors"
22 "launchpad.net/juju-core/instance"
23 "launchpad.net/juju-core/names"
24 "launchpad.net/juju-core/state/api/params"
25 "launchpad.net/juju-core/state/presence"
26 "launchpad.net/juju-core/utils"
27 )
28
29 var unitLogger = loggo.GetLogger("juju.state.unit")
dimitern 2013/08/02 08:36:54 I think this should be "juju.state" and please mov
30
31 // AssignmentPolicy controls what machine a unit will be assigned to.
32 type AssignmentPolicy string
33
34 const (
35 // AssignLocal indicates that all service units should be assigned
36 // to machine 0.
37 AssignLocal AssignmentPolicy = "local"
38
39 // AssignClean indicates that every service unit should be assigned
40 // to a machine which never previously has hosted any units, and that
41 // new machines should be launched if required.
42 AssignClean AssignmentPolicy = "clean"
43
44 // AssignCleanEmpty indicates that every service unit should be assigned
45 // to a machine which never previously has hosted any units, and which i s not
46 // currently hosting any containers, and that new machines should be lau nched if required.
47 AssignCleanEmpty AssignmentPolicy = "clean-empty"
48
49 // AssignNew indicates that every service unit should be assigned to a n ew
50 // dedicated machine. A new machine will be launched for each new unit.
51 AssignNew AssignmentPolicy = "new"
52 )
53
54 // ResolvedMode describes the way state transition errors
55 // are resolved.
56 type ResolvedMode string
57
58 const (
59 ResolvedNone ResolvedMode = ""
60 ResolvedRetryHooks ResolvedMode = "retry-hooks"
61 ResolvedNoHooks ResolvedMode = "no-hooks"
62 )
63
64 // UnitSettings holds information about a service unit's settings within a
65 // relation.
66 type UnitSettings struct {
67 Version int64
68 Settings map[string]interface{}
69 }
70
71 // unitDoc represents the internal state of a unit in MongoDB.
72 // Note the correspondence with UnitInfo in state/api/params.
73 type unitDoc struct {
74 Name string `bson:"_id"`
75 Service string
76 Series string
77 CharmURL *charm.URL
78 Principal string
79 Subordinates []string
80 PublicAddress string
81 PrivateAddress string
82 MachineId string
83 Resolved ResolvedMode
84 Tools *tools.Tools `bson:",omitempty"`
85 Ports []instance.Port
86 Life Life
87 TxnRevno int64 `bson:"txn-revno"`
88 PasswordHash string
89 }
90
91 // Unit represents the state of a service unit.
92 type Unit struct {
93 st *State
94 doc unitDoc
95 annotator
96 }
97
98 func newUnit(st *State, udoc *unitDoc) *Unit {
99 unit := &Unit{
100 st: st,
101 doc: *udoc,
102 }
103 unit.annotator = annotator{
104 globalKey: unit.globalKey(),
105 tag: unit.Tag(),
106 st: st,
107 }
108 return unit
109 }
110
111 // Service returns the service.
112 func (u *Unit) Service() (*Service, error) {
113 return u.st.Service(u.doc.Service)
114 }
115
116 // ConfigSettings returns the complete set of service charm config settings
117 // available to the unit. Unset values will be replaced with the default
118 // value for the associated option, and may thus be nil when no default is
119 // specified.
120 func (u *Unit) ConfigSettings() (charm.Settings, error) {
121 if u.doc.CharmURL == nil {
122 return nil, fmt.Errorf("unit charm not set")
123 }
124 settings, err := readSettings(u.st, serviceSettingsKey(u.doc.Service, u. doc.CharmURL))
125 if err != nil {
126 return nil, err
127 }
128 charm, err := u.st.Charm(u.doc.CharmURL)
129 if err != nil {
130 return nil, err
131 }
132 result := charm.Config().DefaultSettings()
133 for name, value := range settings.Map() {
134 result[name] = value
135 }
136 return result, nil
137 }
138
139 // ServiceName returns the service name.
140 func (u *Unit) ServiceName() string {
141 return u.doc.Service
142 }
143
144 // Series returns the deployed charm's series.
145 func (u *Unit) Series() string {
146 return u.doc.Series
147 }
148
149 // String returns the unit as string.
150 func (u *Unit) String() string {
151 return u.doc.Name
152 }
153
154 // Name returns the unit name.
155 func (u *Unit) Name() string {
156 return u.doc.Name
157 }
158
159 // unitGlobalKey returns the global database key for the named unit.
160 func unitGlobalKey(name string) string {
161 return "u#" + name
162 }
163
164 // globalKey returns the global database key for the unit.
165 func (u *Unit) globalKey() string {
166 return unitGlobalKey(u.doc.Name)
167 }
168
169 // Life returns whether the unit is Alive, Dying or Dead.
170 func (u *Unit) Life() Life {
171 return u.doc.Life
172 }
173
174 // AgentTools returns the tools that the agent is currently running.
175 // It an error that satisfies IsNotFound if the tools have not yet been set.
176 func (u *Unit) AgentTools() (*tools.Tools, error) {
177 if u.doc.Tools == nil {
178 return nil, errors.NotFoundf("agent tools for unit %q", u)
179 }
180 tools := *u.doc.Tools
181 return &tools, nil
182 }
183
184 // SetAgentTools sets the tools that the agent is currently running.
185 func (u *Unit) SetAgentTools(t *tools.Tools) (err error) {
186 defer utils.ErrorContextf(&err, "cannot set agent tools for unit %q", u)
187 if t.Version.Series == "" || t.Version.Arch == "" {
188 return fmt.Errorf("empty series or arch")
189 }
190 ops := []txn.Op{{
191 C: u.st.units.Name,
192 Id: u.doc.Name,
193 Assert: notDeadDoc,
194 Update: D{{"$set", D{{"tools", t}}}},
195 }}
196 if err := u.st.runTransaction(ops); err != nil {
197 return onAbort(err, errDead)
198 }
199 tools := *t
200 u.doc.Tools = &tools
201 return nil
202 }
203
204 // SetMongoPassword sets the password the agent responsible for the unit
205 // should use to communicate with the state servers. Previous passwords
206 // are invalidated.
207 func (u *Unit) SetMongoPassword(password string) error {
208 return u.st.setMongoPassword(u.Tag(), password)
209 }
210
211 // SetPassword sets the password for the machine's agent.
212 func (u *Unit) SetPassword(password string) error {
213 hp := utils.PasswordHash(password)
214 ops := []txn.Op{{
215 C: u.st.units.Name,
216 Id: u.doc.Name,
217 Assert: notDeadDoc,
218 Update: D{{"$set", D{{"passwordhash", hp}}}},
219 }}
220 err := u.st.runTransaction(ops)
221 if err != nil {
222 return fmt.Errorf("cannot set password of unit %q: %v", u, onAbo rt(err, errDead))
223 }
224 u.doc.PasswordHash = hp
225 return nil
226 }
227
228 // PasswordValid returns whether the given password is valid
229 // for the given unit.
230 func (u *Unit) PasswordValid(password string) bool {
231 return utils.PasswordHash(password) == u.doc.PasswordHash
232 }
233
234 // Destroy, when called on a Alive unit, advances its lifecycle as far as
235 // possible; it otherwise has no effect. In most situations, the unit's
236 // life is just set to Dying; but if a principal unit that is not assigned
237 // to a provisioned machine is Destroyed, it will be removed from state
238 // directly.
239 func (u *Unit) Destroy() (err error) {
240 defer func() {
241 if err == nil {
242 // This is a white lie; the document might actually be r emoved.
243 u.doc.Life = Dying
244 }
245 }()
246 unit := &Unit{st: u.st, doc: u.doc}
247 for i := 0; i < 5; i++ {
248 switch ops, err := unit.destroyOps(); err {
249 case errRefresh:
250 case errAlreadyDying:
251 return nil
252 case nil:
253 if err := unit.st.runTransaction(ops); err != txn.ErrAbo rted {
254 return err
255 }
256 default:
257 return err
258 }
259 if err := unit.Refresh(); errors.IsNotFoundError(err) {
260 return nil
261 } else if err != nil {
262 return err
263 }
264 }
265 return ErrExcessiveContention
266 }
267
268 // destroyOps returns the operations required to destroy the unit. If it
269 // returns errRefresh, the unit should be refreshed and the destruction
270 // operations recalculated.
271 func (u *Unit) destroyOps() ([]txn.Op, error) {
272 if u.doc.Life != Alive {
273 return nil, errAlreadyDying
274 }
275
276 // Where possible, we'd like to be able to short-circuit unit destructio n
277 // such that units can be removed directly rather than waiting for their
278 // agents to start, observe Dying, set Dead, and shut down; this takes a
279 // long time and is vexing to users. This turns out to be possible if an d
280 // only if the unit agent has not yet set its status; this implies that the
281 // most the unit could possibly have done is to run its install hook.
282 //
283 // There's no harm in removing a unit that's run its install hook only - -
284 // or, at least, there is no more harm than there is in removing a unit
285 // that's run its stop hook, and that's the usual condition.
286 //
287 // Principals with subordinates are never eligible for this shortcut,
288 // because the unit agent must inevitably have set a status before getti ng
289 // to the point where it can actually create its subordinate.
290 //
291 // Subordinates should be eligible for the shortcut but are not currentl y
292 // considered, on the basis that (1) they were created by active princip als
293 // and can be expected to be deployed pretty soon afterwards, so we don' t
294 // lose much time and (2) by maintaining this restriction, I can reduce
295 // the number of tests that have to change and defer that improvement to
296 // its own CL.
297 minUnitsOp := minUnitsTriggerOp(u.st, u.ServiceName())
298 setDyingOps := []txn.Op{{
299 C: u.st.units.Name,
300 Id: u.doc.Name,
301 Assert: isAliveDoc,
302 Update: D{{"$set", D{{"life", Dying}}}},
303 }, minUnitsOp}
304 if u.doc.Principal != "" {
305 return setDyingOps, nil
306 } else if len(u.doc.Subordinates) != 0 {
307 return setDyingOps, nil
308 }
309
310 sdocId := u.globalKey()
311 sdoc, err := getStatus(u.st, sdocId)
312 if errors.IsNotFoundError(err) {
313 return nil, errAlreadyDying
314 } else if err != nil {
315 return nil, err
316 }
317 if sdoc.Status != params.StatusPending {
318 return setDyingOps, nil
319 }
320 ops := []txn.Op{{
321 C: u.st.statuses.Name,
322 Id: sdocId,
323 Assert: D{{"status", params.StatusPending}},
324 }, minUnitsOp}
325 removeAsserts := append(isAliveDoc, unitHasNoSubordinates...)
326 removeOps, err := u.removeOps(removeAsserts)
327 if err == errAlreadyRemoved {
328 return nil, errAlreadyDying
329 } else if err != nil {
330 return nil, err
331 }
332 return append(ops, removeOps...), nil
333 }
334
335 var errAlreadyRemoved = stderrors.New("entity has already been removed")
336
337 // removeOps returns the operations necessary to remove the unit, assuming
338 // the supplied asserts apply to the unit document.
339 func (u *Unit) removeOps(asserts D) ([]txn.Op, error) {
340 svc, err := u.st.Service(u.doc.Service)
341 if errors.IsNotFoundError(err) {
342 // If the service has been removed, the unit must already have b een.
343 return nil, errAlreadyRemoved
344 } else if err != nil {
345 return nil, err
346 }
347 return svc.removeUnitOps(u, asserts)
348 }
349
350 var ErrUnitHasSubordinates = stderrors.New("unit has subordinates")
351
352 var unitHasNoSubordinates = D{{
353 "$or", []D{
354 {{"subordinates", D{{"$size", 0}}}},
355 {{"subordinates", D{{"$exists", false}}}},
356 },
357 }}
358
359 // EnsureDead sets the unit lifecycle to Dead if it is Alive or Dying.
360 // It does nothing otherwise. If the unit has subordinates, it will
361 // return ErrUnitHasSubordinates.
362 func (u *Unit) EnsureDead() (err error) {
363 if u.doc.Life == Dead {
364 return nil
365 }
366 defer func() {
367 if err == nil {
368 u.doc.Life = Dead
369 }
370 }()
371 ops := []txn.Op{{
372 C: u.st.units.Name,
373 Id: u.doc.Name,
374 Assert: append(notDeadDoc, unitHasNoSubordinates...),
375 Update: D{{"$set", D{{"life", Dead}}}},
376 }}
377 if err := u.st.runTransaction(ops); err != txn.ErrAborted {
378 return err
379 }
380 if notDead, err := isNotDead(u.st.units, u.doc.Name); err != nil {
381 return err
382 } else if !notDead {
383 return nil
384 }
385 return ErrUnitHasSubordinates
386 }
387
388 // Remove removes the unit from state, and may remove its service as well, if
389 // the service is Dying and no other references to it exist. It will fail if
390 // the unit is not Dead.
391 func (u *Unit) Remove() (err error) {
392 defer utils.ErrorContextf(&err, "cannot remove unit %q", u)
393 if u.doc.Life != Dead {
394 return stderrors.New("unit is not dead")
395 }
396 unit := &Unit{st: u.st, doc: u.doc}
397 for i := 0; i < 5; i++ {
398 switch ops, err := unit.removeOps(isDeadDoc); err {
399 case errRefresh:
400 case errAlreadyRemoved:
401 return nil
402 case nil:
403 if err := u.st.runTransaction(ops); err != txn.ErrAborte d {
404 return err
405 }
406 default:
407 return err
408 }
409 if err := unit.Refresh(); errors.IsNotFoundError(err) {
410 return nil
411 } else if err != nil {
412 return err
413 }
414 }
415 return ErrExcessiveContention
416 }
417
418 // Resolved returns the resolved mode for the unit.
419 func (u *Unit) Resolved() ResolvedMode {
420 return u.doc.Resolved
421 }
422
423 // IsPrincipal returns whether the unit is deployed in its own container,
424 // and can therefore have subordinate services deployed alongside it.
425 func (u *Unit) IsPrincipal() bool {
426 return u.doc.Principal == ""
427 }
428
429 // SubordinateNames returns the names of any subordinate units.
430 func (u *Unit) SubordinateNames() []string {
431 names := make([]string, len(u.doc.Subordinates))
432 copy(names, u.doc.Subordinates)
433 return names
434 }
435
436 // DeployerTag returns the tag of the agent responsible for deploying
437 // the unit. If no such entity can be determined, false is returned.
438 func (u *Unit) DeployerTag() (string, bool) {
439 if u.doc.Principal != "" {
440 return names.UnitTag(u.doc.Principal), true
441 } else if u.doc.MachineId != "" {
442 return names.MachineTag(u.doc.MachineId), true
443 }
444 return "", false
445 }
1 446
2 // PublicAddress returns the public address of the unit and whether it is valid. 447 // PublicAddress returns the public address of the unit and whether it is valid.
3 func (u *Unit) PublicAddress() (string, bool) { 448 func (u *Unit) PublicAddress() (string, bool) {
4 » return u.doc.PublicAddress, u.doc.PublicAddress != "" 449 » publicAddress := u.doc.PublicAddress
450 » id := u.doc.MachineId
451 » if id != "" {
452 » » m, err := u.st.Machine(id)
453 » » if err != nil {
454 » » » unitLogger.Errorf("unit %v misses machine id %v", u, id)
wallyworld 2013/08/01 23:21:52 please include err in the logged message
455 » » » return "", false
wallyworld 2013/08/01 23:21:52 did we really want to return "" here? why not u.do
456 » » }
457 » » publicAddress = instance.SelectPublicAddress(m.Addresses())
thumper 2013/08/02 02:24:17 Is there likely to be a case where u.doc.PublicAdd
458 » }
459 » return publicAddress, publicAddress != ""
5 } 460 }
6 461
7 // PrivateAddress returns the private address of the unit and whether it is vali d. 462 // PrivateAddress returns the private address of the unit and whether it is vali d.
463 func (u *Unit) PrivateAddress() (string, bool) {
464 return u.doc.PrivateAddress, u.doc.PrivateAddress != ""
465 }
466
467 // Refresh refreshes the contents of the Unit from the underlying
468 // state. It an error that satisfies IsNotFound if the unit has been removed.
469 func (u *Unit) Refresh() error {
470 err := u.st.units.FindId(u.doc.Name).One(&u.doc)
471 if err == mgo.ErrNotFound {
472 return errors.NotFoundf("unit %q", u)
473 }
474 if err != nil {
475 return fmt.Errorf("cannot refresh unit %q: %v", u, err)
476 }
477 return nil
478 }
479
480 // Status returns the status of the unit's agent.
481 func (u *Unit) Status() (status params.Status, info string, err error) {
482 doc, err := getStatus(u.st, u.globalKey())
483 if err != nil {
484 return "", "", err
485 }
486 status = doc.Status
487 info = doc.StatusInfo
488 return
489 }
490
491 // SetStatus sets the status of the unit.
492 func (u *Unit) SetStatus(status params.Status, info string) error {
493 doc := statusDoc{
494 Status: status,
495 StatusInfo: info,
496 }
497 if err := doc.validateSet(); err != nil {
498 return err
499 }
500 ops := []txn.Op{{
501 C: u.st.units.Name,
502 Id: u.doc.Name,
503 Assert: notDeadDoc,
504 },
505 updateStatusOp(u.st, u.globalKey(), doc),
506 }
507 err := u.st.runTransaction(ops)
508 if err != nil {
509 return fmt.Errorf("cannot set status of unit %q: %v", u, onAbort (err, errDead))
510 }
511 return nil
512 }
513
514 // OpenPort sets the policy of the port with protocol and number to be opened.
515 func (u *Unit) OpenPort(protocol string, number int) (err error) {
516 port := instance.Port{Protocol: protocol, Number: number}
517 defer utils.ErrorContextf(&err, "cannot open port %v for unit %q", port, u)
518 ops := []txn.Op{{
519 C: u.st.units.Name,
520 Id: u.doc.Name,
521 Assert: notDeadDoc,
522 Update: D{{"$addToSet", D{{"ports", port}}}},
523 }}
524 err = u.st.runTransaction(ops)
525 if err != nil {
526 return onAbort(err, errDead)
527 }
528 found := false
529 for _, p := range u.doc.Ports {
530 if p == port {
531 break
532 }
533 }
534 if !found {
535 u.doc.Ports = append(u.doc.Ports, port)
536 }
537 return nil
538 }
539
540 // ClosePort sets the policy of the port with protocol and number to be closed.
541 func (u *Unit) ClosePort(protocol string, number int) (err error) {
542 port := instance.Port{Protocol: protocol, Number: number}
543 defer utils.ErrorContextf(&err, "cannot close port %v for unit %q", port , u)
544 ops := []txn.Op{{
545 C: u.st.units.Name,
546 Id: u.doc.Name,
547 Assert: notDeadDoc,
548 Update: D{{"$pull", D{{"ports", port}}}},
549 }}
550 err = u.st.runTransaction(ops)
551 if err != nil {
552 return onAbort(err, errDead)
553 }
554 newPorts := make([]instance.Port, 0, len(u.doc.Ports))
555 for _, p := range u.doc.Ports {
556 if p != port {
557 newPorts = append(newPorts, p)
558 }
559 }
560 u.doc.Ports = newPorts
561 return nil
562 }
563
564 // OpenedPorts returns a slice containing the open ports of the unit.
565 func (u *Unit) OpenedPorts() []instance.Port {
566 ports := append([]instance.Port{}, u.doc.Ports...)
567 SortPorts(ports)
568 return ports
569 }
570
571 // CharmURL returns the charm URL this unit is currently using.
572 func (u *Unit) CharmURL() (*charm.URL, bool) {
573 if u.doc.CharmURL == nil {
574 return nil, false
575 }
576 return u.doc.CharmURL, true
577 }
578
579 // SetCharmURL marks the unit as currently using the supplied charm URL.
580 // An error will be returned if the unit is dead, or the charm URL not known.
581 func (u *Unit) SetCharmURL(curl *charm.URL) (err error) {
582 defer func() {
583 if err == nil {
584 u.doc.CharmURL = curl
585 }
586 }()
587 if curl == nil {
588 return fmt.Errorf("cannot set nil charm url")
589 }
590 for i := 0; i < 5; i++ {
591 if notDead, err := isNotDead(u.st.units, u.doc.Name); err != nil {
592 return err
593 } else if !notDead {
594 return fmt.Errorf("unit %q is dead", u)
595 }
596 sel := D{{"_id", u.doc.Name}, {"charmurl", curl}}
597 if count, err := u.st.units.Find(sel).Count(); err != nil {
598 return err
599 } else if count == 1 {
600 // Already set
601 return nil
602 }
603 if count, err := u.st.charms.FindId(curl).Count(); err != nil {
604 return err
605 } else if count < 1 {
606 return fmt.Errorf("unknown charm url %q", curl)
607 }
608
609 // Add a reference to the service settings for the new charm.
610 incOp, err := settingsIncRefOp(u.st, u.doc.Service, curl, false)
611 if err != nil {
612 return err
613 }
614
615 // Set the new charm URL.
616 differentCharm := D{{"charmurl", D{{"$ne", curl}}}}
617 ops := []txn.Op{
618 incOp,
619 {
620 C: u.st.units.Name,
621 Id: u.doc.Name,
622 Assert: append(notDeadDoc, differentCharm...),
623 Update: D{{"$set", D{{"charmurl", curl}}}},
624 }}
625 if u.doc.CharmURL != nil {
626 // Drop the reference to the old charm.
627 decOps, err := settingsDecRefOps(u.st, u.doc.Service, u. doc.CharmURL)
628 if err != nil {
629 return err
630 }
631 ops = append(ops, decOps...)
632 }
633 if err := u.st.runTransaction(ops); err != txn.ErrAborted {
634 return err
635 }
636 }
637 return ErrExcessiveContention
638 }
639
640 // AgentAlive returns whether the respective remote agent is alive.
641 func (u *Unit) AgentAlive() (bool, error) {
642 return u.st.pwatcher.Alive(u.globalKey())
643 }
644
645 // Tag returns a name identifying the unit that is safe to use
646 // as a file name. The returned name will be different from other
647 // Tag values returned by any other entities from the same state.
648 func (u *Unit) Tag() string {
649 return names.UnitTag(u.Name())
650 }
651
652 // WaitAgentAlive blocks until the respective agent is alive.
653 func (u *Unit) WaitAgentAlive(timeout time.Duration) (err error) {
654 defer utils.ErrorContextf(&err, "waiting for agent of unit %q", u)
655 ch := make(chan presence.Change)
656 u.st.pwatcher.Watch(u.globalKey(), ch)
657 defer u.st.pwatcher.Unwatch(u.globalKey(), ch)
658 for i := 0; i < 2; i++ {
659 select {
660 case change := <-ch:
661 if change.Alive {
662 return nil
663 }
664 case <-time.After(timeout):
665 return fmt.Errorf("still not alive after timeout")
666 case <-u.st.pwatcher.Dead():
667 return u.st.pwatcher.Err()
668 }
669 }
670 panic(fmt.Sprintf("presence reported dead status twice in a row for unit %q", u))
671 }
672
673 // SetAgentAlive signals that the agent for unit u is alive.
674 // It returns the started pinger.
675 func (u *Unit) SetAgentAlive() (*presence.Pinger, error) {
676 p := presence.NewPinger(u.st.presence, u.globalKey())
677 err := p.Start()
678 if err != nil {
679 return nil, err
680 }
681 return p, nil
682 }
683
684 // NotAssignedError indicates that a unit is not assigned to a machine (and, in
685 // the case of subordinate units, that the unit's principal is not assigned).
686 type NotAssignedError struct{ Unit *Unit }
687
688 func (e *NotAssignedError) Error() string {
689 return fmt.Sprintf("unit %q is not assigned to a machine", e.Unit)
690 }
691
692 func IsNotAssigned(err error) bool {
693 _, ok := err.(*NotAssignedError)
694 return ok
695 }
696
697 // AssignedMachineId returns the id of the assigned machine.
698 func (u *Unit) AssignedMachineId() (id string, err error) {
699 if u.IsPrincipal() {
700 if u.doc.MachineId == "" {
701 return "", &NotAssignedError{u}
702 }
703 return u.doc.MachineId, nil
704 }
705 pudoc := unitDoc{}
706 err = u.st.units.Find(D{{"_id", u.doc.Principal}}).One(&pudoc)
707 if err == mgo.ErrNotFound {
708 return "", errors.NotFoundf("principal unit %q of %q", u.doc.Pri ncipal, u)
709 } else if err != nil {
710 return "", err
711 }
712 if pudoc.MachineId == "" {
713 return "", &NotAssignedError{u}
714 }
715 return pudoc.MachineId, nil
716 }
717
718 var (
719 machineNotAliveErr = stderrors.New("machine is not alive")
720 machineNotCleanErr = stderrors.New("machine is dirty")
721 unitNotAliveErr = stderrors.New("unit is not alive")
722 alreadyAssignedErr = stderrors.New("unit is already assigned to a machin e")
723 inUseErr = stderrors.New("machine is not unused")
724 )
725
726 // assignToMachine is the internal version of AssignToMachine,
727 // also used by AssignToUnusedMachine. It returns specific errors
728 // in some cases:
729 // - machineNotAliveErr when the machine is not alive.
730 // - unitNotAliveErr when the unit is not alive.
731 // - alreadyAssignedErr when the unit has already been assigned
732 // - inUseErr when the machine already has a unit assigned (if unused is true)
733 func (u *Unit) assignToMachine(m *Machine, unused bool) (err error) {
734 if u.doc.Series != m.doc.Series {
735 return fmt.Errorf("series does not match")
736 }
737 if u.doc.MachineId != "" {
738 if u.doc.MachineId != m.Id() {
739 return alreadyAssignedErr
740 }
741 return nil
742 }
743 if u.doc.Principal != "" {
744 return fmt.Errorf("unit is a subordinate")
745 }
746 canHost := false
747 for _, j := range m.doc.Jobs {
748 if j == JobHostUnits {
749 canHost = true
750 break
751 }
752 }
753 if !canHost {
754 return fmt.Errorf("machine %q cannot host units", m)
755 }
756 assert := append(isAliveDoc, D{
757 {"$or", []D{
758 {{"machineid", ""}},
759 {{"machineid", m.Id()}},
760 }},
761 }...)
762 massert := isAliveDoc
763 if unused {
764 massert = append(massert, D{{"clean", D{{"$ne", false}}}}...)
765 }
766 ops := []txn.Op{{
767 C: u.st.units.Name,
768 Id: u.doc.Name,
769 Assert: assert,
770 Update: D{{"$set", D{{"machineid", m.doc.Id}}}},
771 }, {
772 C: u.st.machines.Name,
773 Id: m.doc.Id,
774 Assert: massert,
775 Update: D{{"$addToSet", D{{"principals", u.doc.Name}}}, {"$set", D{{"clean", false}}}},
776 }}
777 err = u.st.runTransaction(ops)
778 if err == nil {
779 u.doc.MachineId = m.doc.Id
780 m.doc.Clean = false
781 return nil
782 }
783 if err != txn.ErrAborted {
784 return err
785 }
786 u0, err := u.st.Unit(u.Name())
787 if err != nil {
788 return err
789 }
790 m0, err := u.st.Machine(m.Id())
791 if err != nil {
792 return err
793 }
794 switch {
795 case u0.Life() != Alive:
796 return unitNotAliveErr
797 case m0.Life() != Alive:
798 return machineNotAliveErr
799 case u0.doc.MachineId != "" || !unused:
800 return alreadyAssignedErr
801 }
802 return inUseErr
803 }
804
805 func assignContextf(err *error, unit *Unit, target string) {
806 if *err != nil {
807 *err = fmt.Errorf("cannot assign unit %q to %s: %v", unit, targe t, *err)
808 }
809 }
810
811 // AssignToMachine assigns this unit to a given machine.
812 func (u *Unit) AssignToMachine(m *Machine) (err error) {
813 defer assignContextf(&err, u, fmt.Sprintf("machine %s", m))
814 return u.assignToMachine(m, false)
815 }
816
817 // assignToNewMachine assigns the unit to a machine created according to the sup plied params,
818 // with the supplied constraints.
819 func (u *Unit) assignToNewMachine(params *AddMachineParams, cons constraints.Val ue) (err error) {
820 ops, instData, containerParams, err := u.st.addMachineContainerOps(param s, cons)
821 if err != nil {
822 return err
823 }
824 mdoc := &machineDoc{
825 Series: u.doc.Series,
826 ContainerType: string(params.ContainerType),
827 Jobs: []MachineJob{JobHostUnits},
828 Principals: []string{u.doc.Name},
829 Clean: false,
830 }
831 mdoc, machineOps, err := u.st.addMachineOps(mdoc, instData, cons, contai nerParams)
832 if err != nil {
833 return err
834 }
835 ops = append(ops, machineOps...)
836 isUnassigned := D{{"machineid", ""}}
837 asserts := append(isAliveDoc, isUnassigned...)
838 // Ensure the host machine is really clean.
839 if params.ParentId != "" {
840 ops = append(ops, txn.Op{
841 C: u.st.machines.Name,
842 Id: params.ParentId,
843 Assert: D{{"clean", true}},
844 }, txn.Op{
845 C: u.st.containerRefs.Name,
846 Id: params.ParentId,
847 Assert: D{hasNoContainersTerm},
848 })
849 }
850 ops = append(ops, txn.Op{
851 C: u.st.units.Name,
852 Id: u.doc.Name,
853 Assert: asserts,
854 Update: D{{"$set", D{{"machineid", mdoc.Id}}}},
855 })
856 err = u.st.runTransaction(ops)
857 if err == nil {
858 u.doc.MachineId = mdoc.Id
859 return nil
860 } else if err != txn.ErrAborted {
861 return err
862 }
863 // If we assume that the machine ops will never give us an operation tha t
864 // would fail (because the machine id that it has is unique), then the o nly
865 // reasons that the transaction could have been aborted are:
866 // * the unit is no longer alive
867 // * the unit has been assigned to a different machine
868 // * the parent machine we want to create a container on was clean but became dirty
869 unit, err := u.st.Unit(u.Name())
870 if err != nil {
871 return err
872 }
873 switch {
874 case unit.Life() != Alive:
875 return unitNotAliveErr
876 case unit.doc.MachineId != "":
877 return alreadyAssignedErr
878 }
879 if params.ParentId != "" {
880 m, err := u.st.Machine(params.ParentId)
881 if err != nil {
882 return err
883 }
884 if !m.Clean() {
885 return machineNotCleanErr
886 }
887 containers, err := m.Containers()
888 if err != nil {
889 return err
890 }
891 if len(containers) > 0 {
892 return machineNotCleanErr
893 }
894 }
895 // Other error condition not considered.
896 return fmt.Errorf("unknown error")
897 }
898
899 // constraints is a helper function to return a unit's deployment constraints.
900 func (u *Unit) constraints() (*constraints.Value, error) {
901 cons, err := readConstraints(u.st, u.globalKey())
902 if errors.IsNotFoundError(err) {
903 // Lack of constraints indicates lack of unit.
904 return nil, errors.NotFoundf("unit")
905 } else if err != nil {
906 return nil, err
907 }
908 return &cons, nil
909 }
910
911 // AssignToNewMachineOrContainer assigns the unit to a new machine, with constra ints
912 // determined according to the service and environment constraints at the time o f unit creation.
913 // If a container is required, a clean, empty machine instance is required on wh ich to create
914 // the container. An existing clean, empty instance is first searched for, and i f not found,
915 // a new one is created.
916 func (u *Unit) AssignToNewMachineOrContainer() (err error) {
917 defer assignContextf(&err, u, "new machine or container")
918 if u.doc.Principal != "" {
919 return fmt.Errorf("unit is a subordinate")
920 }
921 cons, err := u.constraints()
922 if err != nil {
923 return err
924 }
925 if !cons.HasContainer() {
926 return u.AssignToNewMachine()
927 }
928
929 // Find a clean, empty machine on which to create a container.
930 var host machineDoc
931 hostCons := *cons
932 noContainer := instance.NONE
933 hostCons.Container = &noContainer
934 query, err := u.findCleanMachineQuery(true, &hostCons)
935 if err != nil {
936 return err
937 }
938 err = query.One(&host)
939 if err == mgo.ErrNotFound {
940 // No existing clean, empty machine so create a new one.
941 // The container constraint will be used by AssignToNewMachine t o create the required container.
942 return u.AssignToNewMachine()
943 } else if err != nil {
944 return err
945 }
946 params := &AddMachineParams{
947 Series: u.doc.Series,
948 ParentId: host.Id,
949 ContainerType: *cons.Container,
950 Jobs: []MachineJob{JobHostUnits},
951 }
952 err = u.assignToNewMachine(params, *cons)
953 if err == machineNotCleanErr {
954 // The clean machine was used before we got a chance to use it s o just
955 // stick the unit on a new machine.
956 return u.AssignToNewMachine()
957 }
958 return err
959 }
960
961 // AssignToNewMachine assigns the unit to a new machine, with constraints
962 // determined according to the service and environment constraints at the
963 // time of unit creation.
964 func (u *Unit) AssignToNewMachine() (err error) {
965 defer assignContextf(&err, u, "new machine")
966 if u.doc.Principal != "" {
967 return fmt.Errorf("unit is a subordinate")
968 }
969 // Get the ops necessary to create a new machine, and the machine doc th at
970 // will be added with those operations (which includes the machine id).
971 cons, err := u.constraints()
972 if err != nil {
973 return err
974 }
975 var containerType instance.ContainerType
976 // Configure to create a new container if required.
977 if cons.HasContainer() {
978 containerType = *cons.Container
979 }
980 params := &AddMachineParams{
981 Series: u.doc.Series,
982 ContainerType: containerType,
983 Jobs: []MachineJob{JobHostUnits},
984 }
985 err = u.assignToNewMachine(params, *cons)
986 return err
987 }
988
989 var noCleanMachines = stderrors.New("all eligible machines in use")
990
991 // AssignToCleanMachine assigns u to a machine which is marked as clean. A machi ne
992 // is clean if it has never had any principal units assigned to it.
993 // If there are no clean machines besides any machine(s) running JobHostEnviron,
994 // an error is returned.
995 // This method does not take constraints into consideration when choosing a
996 // machine (lp:1161919).
997 func (u *Unit) AssignToCleanMachine() (m *Machine, err error) {
998 return u.assignToCleanMaybeEmptyMachine(false)
999 }
1000
1001 // AssignToCleanMachine assigns u to a machine which is marked as clean and is a lso
1002 // not hosting any containers. A machine is clean if it has never had any princi pal units
1003 // assigned to it. If there are no clean machines besides any machine(s) running JobHostEnviron,
1004 // an error is returned.
1005 // This method does not take constraints into consideration when choosing a
1006 // machine (lp:1161919).
1007 func (u *Unit) AssignToCleanEmptyMachine() (m *Machine, err error) {
1008 return u.assignToCleanMaybeEmptyMachine(true)
1009 }
1010
1011 var hasContainerTerm = bson.DocElem{
1012 "$and", []D{
1013 {{"children", D{{"$not", D{{"$size", 0}}}}}},
1014 {{"children", D{{"$exists", true}}}},
1015 }}
1016
1017 var hasNoContainersTerm = bson.DocElem{
1018 "$or", []D{
1019 {{"children", D{{"$size", 0}}}},
1020 {{"children", D{{"$exists", false}}}},
1021 }}
1022
1023 // findCleanMachineQuery returns a Mongo query to find clean (and possibly empty ) machines with
1024 // characteristics matching the specified constraints.
1025 func (u *Unit) findCleanMachineQuery(requireEmpty bool, cons *constraints.Value) (*mgo.Query, error) {
1026 // Select all machines that can accept principal units and are clean.
1027 var containerRefs []machineContainers
1028 // If we need empty machines, first build up a list of machine ids which have containers
1029 // so we can exclude those.
1030 if requireEmpty {
1031 err := u.st.containerRefs.Find(D{hasContainerTerm}).Select(bson. M{"_id": 1}).All(&containerRefs)
1032 if err != nil {
1033 return nil, err
1034 }
1035 }
1036 var machinesWithContainers = make([]string, len(containerRefs))
1037 for i, cref := range containerRefs {
1038 machinesWithContainers[i] = cref.Id
1039 }
1040 terms := D{
1041 {"life", Alive},
1042 {"series", u.doc.Series},
1043 {"jobs", []MachineJob{JobHostUnits}},
1044 {"clean", true},
1045 {"_id", D{{"$nin", machinesWithContainers}}},
1046 }
1047 // Add the container filter term if necessary.
1048 var containerType instance.ContainerType
1049 if cons.Container != nil {
1050 containerType = *cons.Container
1051 }
1052 if containerType == instance.NONE {
1053 terms = append(terms, bson.DocElem{"containertype", ""})
1054 } else if containerType != "" {
1055 terms = append(terms, bson.DocElem{"containertype", string(conta inerType)})
1056 }
1057
1058 // Find the ids of machines which satisfy any required hardware constrai nts.
1059 // If there is no instanceData for a machine, that machine is not consid ered as suitable for
1060 // deploying the unit. This can happen if the machine is not yet provisi oned. It may be that
1061 // when the machine is provisioned it will be found to be suitable, but we don't know that right
1062 // now and it's best to err on the side of caution and exclude such mach ines.
1063 var suitableInstanceData []instanceData
1064 var suitableTerms D
1065 if cons.Arch != nil && *cons.Arch != "" {
1066 suitableTerms = append(suitableTerms, bson.DocElem{"arch", *cons .Arch})
1067 }
1068 if cons.Mem != nil && *cons.Mem > 0 {
1069 suitableTerms = append(suitableTerms, bson.DocElem{"mem", D{{"$g te", *cons.Mem}}})
1070 }
1071 if cons.CpuCores != nil && *cons.CpuCores > 0 {
1072 suitableTerms = append(suitableTerms, bson.DocElem{"cpucores", D {{"$gte", *cons.CpuCores}}})
1073 }
1074 if cons.CpuPower != nil && *cons.CpuPower > 0 {
1075 suitableTerms = append(suitableTerms, bson.DocElem{"cpupower", D {{"$gte", *cons.CpuPower}}})
1076 }
1077 if len(suitableTerms) > 0 {
1078 err := u.st.instanceData.Find(suitableTerms).Select(bson.M{"_id" : 1}).All(&suitableInstanceData)
1079 if err != nil {
1080 return nil, err
1081 }
1082 var suitableIds = make([]string, len(suitableInstanceData))
1083 for i, m := range suitableInstanceData {
1084 suitableIds[i] = m.Id
1085 }
1086 terms = append(terms, bson.DocElem{"_id", D{{"$in", suitableIds} }})
1087 }
1088 return u.st.machines.Find(terms), nil
1089 }
1090
1091 // assignToCleanMaybeEmptyMachine implements AssignToCleanMachine and AssignToCl eanEmptyMachine.
1092 // A 'machine' may be a machine instance or container depending on the service c onstraints.
1093 func (u *Unit) assignToCleanMaybeEmptyMachine(requireEmpty bool) (m *Machine, er r error) {
1094 context := "clean"
1095 if requireEmpty {
1096 context += ", empty"
1097 }
1098 context += " machine"
1099
1100 if u.doc.Principal != "" {
1101 err = fmt.Errorf("unit is a subordinate")
1102 assignContextf(&err, u, context)
1103 return nil, err
1104 }
1105
1106 // Get the unit constraints to see what deployment requirements we have to adhere to.
1107 cons, err := u.constraints()
1108 if err != nil {
1109 assignContextf(&err, u, context)
1110 return nil, err
1111 }
1112 query, err := u.findCleanMachineQuery(requireEmpty, cons)
1113 if err != nil {
1114 assignContextf(&err, u, context)
1115 return nil, err
1116 }
1117
1118 // TODO use Batch(1). See https://bugs.launchpad.net/mgo/+bug/1053509
1119 // TODO(rog) Fix so this is more efficient when there are concurrent use s.
1120 // Possible solution: pick the highest and the smallest id of all
1121 // unused machines, and try to assign to the first one >= a random id in the
1122 // middle.
1123 iter := query.Batch(2).Prefetch(0).Iter()
1124 var mdoc machineDoc
1125 for iter.Next(&mdoc) {
1126 m := newMachine(u.st, &mdoc)
1127 err := u.assignToMachine(m, true)
1128 if err == nil {
1129 return m, nil
1130 }
1131 if err != inUseErr && err != machineNotAliveErr {
1132 assignContextf(&err, u, context)
1133 return nil, err
1134 }
1135 }
1136 if err := iter.Err(); err != nil {
1137 assignContextf(&err, u, context)
1138 return nil, err
1139 }
1140 return nil, noCleanMachines
1141 }
1142
1143 // UnassignFromMachine removes the assignment between this unit and the
1144 // machine it's assigned to.
1145 func (u *Unit) UnassignFromMachine() (err error) {
1146 // TODO check local machine id and add an assert that the
1147 // machine id is as expected.
1148 ops := []txn.Op{{
1149 C: u.st.units.Name,
1150 Id: u.doc.Name,
1151 Assert: txn.DocExists,
1152 Update: D{{"$set", D{{"machineid", ""}}}},
1153 }}
1154 if u.doc.MachineId != "" {
1155 ops = append(ops, txn.Op{
1156 C: u.st.machines.Name,
1157 Id: u.doc.MachineId,
1158 Assert: txn.DocExists,
1159 Update: D{{"$pull", D{{"principals", u.doc.Name}}}},
1160 })
1161 }
1162 err = u.st.runTransaction(ops)
1163 if err != nil {
1164 return fmt.Errorf("cannot unassign unit %q from machine: %v", u, onAbort(err, errors.NotFoundf("machine")))
1165 }
1166 u.doc.MachineId = ""
1167 return nil
1168 }
1169
1170 // SetPublicAddress sets the public address of the unit.
1171 func (u *Unit) SetPublicAddress(address string) (err error) {
1172 ops := []txn.Op{{
1173 C: u.st.units.Name,
1174 Id: u.doc.Name,
1175 Assert: txn.DocExists,
1176 Update: D{{"$set", D{{"publicaddress", address}}}},
1177 }}
1178 if err := u.st.runTransaction(ops); err != nil {
1179 return fmt.Errorf("cannot set public address of unit %q: %v", u, onAbort(err, errors.NotFoundf("unit")))
1180 }
1181 u.doc.PublicAddress = address
1182 return nil
1183 }
1184
1185 // SetPrivateAddress sets the private address of the unit.
1186 func (u *Unit) SetPrivateAddress(address string) error {
1187 ops := []txn.Op{{
1188 C: u.st.units.Name,
1189 Id: u.doc.Name,
1190 Assert: notDeadDoc,
1191 Update: D{{"$set", D{{"privateaddress", address}}}},
1192 }}
1193 err := u.st.runTransaction(ops)
1194 if err != nil {
1195 return fmt.Errorf("cannot set private address of unit %q: %v", u , onAbort(err, errors.NotFoundf("unit")))
1196 }
1197 u.doc.PrivateAddress = address
1198 return nil
1199 }
1200
1201 // Resolve marks the unit as having had any previous state transition
1202 // problems resolved, and informs the unit that it may attempt to
1203 // reestablish normal workflow. The retryHooks parameter informs
1204 // whether to attempt to reexecute previous failed hooks or to continue
1205 // as if they had succeeded before.
1206 func (u *Unit) Resolve(retryHooks bool) error {
1207 status, _, err := u.Status()
1208 if err != nil {
1209 return err
1210 }
1211 if status != params.StatusError {
1212 return fmt.Errorf("unit %q is not in an error state", u)
1213 }
1214 mode := ResolvedNoHooks
1215 if retryHooks {
1216 mode = ResolvedRetryHooks
1217 }
1218 return u.SetResolved(mode)
1219 }
1220
1221 // SetResolved marks the unit as having had any previous state transition
1222 // problems resolved, and informs the unit that it may attempt to
1223 // reestablish normal workflow. The resolved mode parameter informs
1224 // whether to attempt to reexecute previous failed hooks or to continue
1225 // as if they had succeeded before.
1226 func (u *Unit) SetResolved(mode ResolvedMode) (err error) {
1227 defer utils.ErrorContextf(&err, "cannot set resolved mode for unit %q", u)
1228 switch mode {
1229 case ResolvedRetryHooks, ResolvedNoHooks:
1230 default:
1231 return fmt.Errorf("invalid error resolution mode: %q", mode)
1232 }
1233 // TODO(fwereade): assert unit has error status.
1234 resolvedNotSet := D{{"resolved", ResolvedNone}}
1235 ops := []txn.Op{{
1236 C: u.st.units.Name,
1237 Id: u.doc.Name,
1238 Assert: append(notDeadDoc, resolvedNotSet...),
1239 Update: D{{"$set", D{{"resolved", mode}}}},
1240 }}
1241 if err := u.st.runTransaction(ops); err == nil {
1242 u.doc.Resolved = mode
1243 return nil
1244 } else if err != txn.ErrAborted {
1245 return err
1246 }
1247 if ok, err := isNotDead(u.st.units, u.doc.Name); err != nil {
1248 return err
1249 } else if !ok {
1250 return errDead
1251 }
1252 // For now, the only remaining assert is that resolved was unset.
1253 return fmt.Errorf("already resolved")
1254 }
1255
1256 // ClearResolved removes any resolved setting on the unit.
1257 func (u *Unit) ClearResolved() error {
1258 ops := []txn.Op{{
1259 C: u.st.units.Name,
1260 Id: u.doc.Name,
1261 Assert: txn.DocExists,
1262 Update: D{{"$set", D{{"resolved", ResolvedNone}}}},
1263 }}
1264 err := u.st.runTransaction(ops)
1265 if err != nil {
1266 return fmt.Errorf("cannot clear resolved mode for unit %q: %v", u, errors.NotFoundf("unit"))
1267 }
1268 u.doc.Resolved = ResolvedNone
1269 return nil
1270 }
1271
1272 type portSlice []instance.Port
1273
1274 func (p portSlice) Len() int { return len(p) }
1275 func (p portSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
1276 func (p portSlice) Less(i, j int) bool {
1277 p1 := p[i]
1278 p2 := p[j]
1279 if p1.Protocol != p2.Protocol {
1280 return p1.Protocol < p2.Protocol
1281 }
1282 return p1.Number < p2.Number
1283 }
1284
1285 // SortPorts sorts the given ports, first by protocol,
1286 // then by number.
1287 func SortPorts(ports []instance.Port) {
1288 sort.Sort(portSlice(ports))
1289 }
LEFTRIGHT

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b