Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1472)

Side by Side Diff: ssh/channel.go

Issue 14225043: code review 14225043: go.crypto/ssh: reimplement SSH connection protocol modu... (Closed)
Patch Set: diff -r 5ff5636e18c9 https://code.google.com/p/go.crypto Created 10 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « no previous file | ssh/client.go » ('j') | ssh/client.go » ('J')
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 // Copyright 2011 The Go Authors. All rights reserved. 1 // Copyright 2011 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 package ssh 5 package ssh
6 6
7 import ( 7 import (
8 "encoding/binary"
8 "errors" 9 "errors"
9 "fmt" 10 "fmt"
10 "io" 11 "io"
12 "log"
11 "sync" 13 "sync"
12 "sync/atomic"
13 ) 14 )
14 15
15 // extendedDataTypeCode identifies an OpenSSL extended data type. See RFC 4254, 16 // A Channel is an ordered, reliable, duplex stream that is
16 // section 5.2. 17 // multiplexed over an SSH connection.
17 type extendedDataTypeCode uint32
18
19 const (
20 » // extendedDataStderr is the extended data type that is used for stderr.
21 » extendedDataStderr extendedDataTypeCode = 1
22
23 » // minPacketLength defines the smallest valid packet
24 » minPacketLength = 9
25 )
26
27 // A Channel is an ordered, reliable, duplex stream that is multiplexed over an
28 // SSH connection. Channel.Read can return a ChannelRequest as an error.
29 type Channel interface { 18 type Channel interface {
jpsugar 2013/10/17 00:51:02 I'm not sure I'm a fan of this model where both na
hanwen-google 2013/10/17 14:59:08 It minimizes API breakage, but I agree your sugges
30 // Accept accepts the channel creation request. 19 // Accept accepts the channel creation request.
31 Accept() error 20 Accept() error
32 » // Reject rejects the channel creation request. After calling this, no 21
33 » // other methods on the Channel may be called. If they are then the 22 » // Reject rejects the channel creation request. After calling
34 » // peer is likely to signal a protocol error and drop the connection. 23 » // this, no other methods on the Channel may be called.
35 Reject(reason RejectionReason, message string) error 24 Reject(reason RejectionReason, message string) error
36 25
37 » // Read may return a ChannelRequest as an error. 26 » // NOSUBMIT - should add ReadExtended/WriteExtended here?
27
38 Read(data []byte) (int, error) 28 Read(data []byte) (int, error)
39 Write(data []byte) (int, error) 29 Write(data []byte) (int, error)
30
31 // Sends EOF to the other side.
32 CloseWrite() error
33
34 // Signals end of channel use. No data may be sent after this
35 // call.
40 Close() error 36 Close() error
41 37
42 » // Stderr returns an io.Writer that writes to this channel with the 38 » // Stderr returns an io.ReadWriter that writes to this channel with the
jpsugar 2013/10/17 00:51:02 This comment only talks about writing. It would be
hanwen-google 2013/10/17 14:59:08 Done.
43 // extended data type set to stderr. 39 // extended data type set to stderr.
44 » Stderr() io.Writer 40 » Stderr() io.ReadWriter
45 41
46 » // AckRequest either sends an ack or nack to the channel request. 42 » // SendRequest sends a channel request.
43 » SendRequest(name string, wantReply bool, payload []byte) (bool, error)
44
45 » // IncomingRequests returns the channel for out-of-band
46 » // requests. For all requests received that have WantReply, a
47 » // call of AckRequest should follow.
48 » IncomingRequests() chan *ChannelRequest
49
50 » // AckRequest either sends an ack or nack to the channel
51 » // request. It should only be called if the last
52 » // ChannelRequest had a WantReply
47 AckRequest(ok bool) error 53 AckRequest(ok bool) error
48 54
49 // ChannelType returns the type of the channel, as supplied by the 55 // ChannelType returns the type of the channel, as supplied by the
50 // client. 56 // client.
51 ChannelType() string 57 ChannelType() string
58
52 // ExtraData returns the arbitrary payload for this channel, as supplied 59 // ExtraData returns the arbitrary payload for this channel, as supplied
53 // by the client. This data is specific to the channel type. 60 // by the client. This data is specific to the channel type.
54 ExtraData() []byte 61 ExtraData() []byte
55 } 62 }
56 63
57 // ChannelRequest represents a request sent on a channel, outside of the normal 64 // ChannelRequest represents a request sent, outside of the normal
58 // stream of bytes. It may result from calling Read on a Channel. 65 // stream of bytes. Requests may either be channel specific, or global
59 type ChannelRequest struct { 66 type ChannelRequest struct {
60 Request string 67 Request string
61 WantReply bool 68 WantReply bool
62 Payload []byte 69 Payload []byte
63 } 70 }
64 71
65 func (c ChannelRequest) Error() string {
66 return "ssh: channel request received"
67 }
68
69 // RejectionReason is an enumeration used when rejecting channel creation 72 // RejectionReason is an enumeration used when rejecting channel creation
70 // requests. See RFC 4254, section 5.1. 73 // requests. See RFC 4254, section 5.1.
71 type RejectionReason uint32 74 type RejectionReason uint32
72 75
73 const ( 76 const (
74 Prohibited RejectionReason = iota + 1 77 Prohibited RejectionReason = iota + 1
75 ConnectionFailed 78 ConnectionFailed
76 UnknownChannelType 79 UnknownChannelType
77 ResourceShortage 80 ResourceShortage
78 ) 81 )
79 82
80 // String converts the rejection reason to human readable form. 83 // String converts the rejection reason to human readable form.
81 func (r RejectionReason) String() string { 84 func (r RejectionReason) String() string {
82 switch r { 85 switch r {
83 case Prohibited: 86 case Prohibited:
84 return "administratively prohibited" 87 return "administratively prohibited"
85 case ConnectionFailed: 88 case ConnectionFailed:
86 return "connect failed" 89 return "connect failed"
87 case UnknownChannelType: 90 case UnknownChannelType:
88 return "unknown channel type" 91 return "unknown channel type"
89 case ResourceShortage: 92 case ResourceShortage:
90 return "resource shortage" 93 return "resource shortage"
91 } 94 }
92 return fmt.Sprintf("unknown reason %d", int(r)) 95 return fmt.Sprintf("unknown reason %d", int(r))
93 } 96 }
94 97
98 // channel implements the Channel
95 type channel struct { 99 type channel struct {
96 » packetConn // the underlying transport 100 » // R/O after creation
101 » chanType string
102 » extraData []byte
97 localId, remoteId uint32 103 localId, remoteId uint32
98 remoteWin window
99 maxPacket uint32 104 maxPacket uint32
100 » isClosed uint32 // atomic bool, non zero if true 105 » mux *mux
106
107 » // If set, we have called Accept or Reject on this channel
108 » decided bool
109
110 » // Pending internal channel messages.
111 » msg chan interface{}
112
113 » // Pending user-serviceable messages.
114 » sentRequestMu sync.Mutex
115 » pendingRequests chan *ChannelRequest
116
117 » sentEOF bool
118
119 » // thread-safe data
120 » remoteWin window
121 » pending *buffer
122 » extPending *buffer
123
124 » // Protects all of the below.
125 » mu sync.Mutex
126 » myWindow uint32
127 » sentClose bool
101 } 128 }
102 129
103 func (c *channel) sendWindowAdj(n int) error { 130 func (c *channel) getWindowSpace(max uint32) (uint32, error) {
104 » msg := windowAdjustMsg{ 131 » // check if closed?
105 » » PeersId: c.remoteId,
106 » » AdditionalBytes: uint32(n),
107 » }
108 » return c.writePacket(marshal(msgChannelWindowAdjust, msg))
109 }
110
111 // sendEOF sends EOF to the remote side. RFC 4254 Section 5.3
112 func (c *channel) sendEOF() error {
113 » return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{
114 » » PeersId: c.remoteId,
115 » }))
116 }
117
118 // sendClose informs the remote side of our intent to close the channel.
119 func (c *channel) sendClose() error {
120 » return c.packetConn.writePacket(marshal(msgChannelClose, channelCloseMsg {
121 » » PeersId: c.remoteId,
122 » }))
123 }
124
125 func (c *channel) sendChannelOpenFailure(reason RejectionReason, message string) error {
126 » reject := channelOpenFailureMsg{
127 » » PeersId: c.remoteId,
128 » » Reason: reason,
129 » » Message: message,
130 » » Language: "en",
131 » }
132 » return c.writePacket(marshal(msgChannelOpenFailure, reject))
133 }
134
135 func (c *channel) writePacket(b []byte) error {
136 » if c.closed() {
137 » » return io.EOF
138 » }
139 » if uint32(len(b)) > c.maxPacket {
140 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(b), c.maxPacket)
141 » }
142 » return c.packetConn.writePacket(b)
143 }
144
145 func (c *channel) closed() bool {
146 » return atomic.LoadUint32(&c.isClosed) > 0
147 }
148
149 func (c *channel) setClosed() bool {
150 » return atomic.CompareAndSwapUint32(&c.isClosed, 0, 1)
151 }
152
153 type serverChan struct {
154 » channel
155 » // immutable once created
156 » chanType string
157 » extraData []byte
158
159 » serverConn *ServerConn
160 » myWindow uint32
161 » theyClosed bool // indicates the close msg has been received from the r emote side
162 » theySentEOF bool
163 » isDead uint32
164 » err error
165
166 » pendingRequests []ChannelRequest
167 » pendingData []byte
168 » head, length int
169
170 » // This lock is inferior to serverConn.lock
171 » cond *sync.Cond
172 }
173
174 func (c *serverChan) Accept() error {
175 » c.serverConn.lock.Lock()
176 » defer c.serverConn.lock.Unlock()
177
178 » if c.serverConn.err != nil {
179 » » return c.serverConn.err
180 » }
181
182 » confirm := channelOpenConfirmMsg{
183 » » PeersId: c.remoteId,
184 » » MyId: c.localId,
185 » » MyWindow: c.myWindow,
186 » » MaxPacketSize: c.maxPacket,
187 » }
188 » return c.writePacket(marshal(msgChannelOpenConfirm, confirm))
189 }
190
191 func (c *serverChan) Reject(reason RejectionReason, message string) error {
192 » c.serverConn.lock.Lock()
193 » defer c.serverConn.lock.Unlock()
194
195 » if c.serverConn.err != nil {
196 » » return c.serverConn.err
197 » }
198
199 » return c.sendChannelOpenFailure(reason, message)
200 }
201
202 func (c *serverChan) handlePacket(packet interface{}) {
203 » c.cond.L.Lock()
204 » defer c.cond.L.Unlock()
205
206 » switch packet := packet.(type) {
207 » case *channelRequestMsg:
208 » » req := ChannelRequest{
209 » » » Request: packet.Request,
210 » » » WantReply: packet.WantReply,
211 » » » Payload: packet.RequestSpecificData,
212 » » }
213
214 » » c.pendingRequests = append(c.pendingRequests, req)
215 » » c.cond.Signal()
216 » case *channelCloseMsg:
217 » » c.theyClosed = true
218 » » c.cond.Signal()
219 » case *channelEOFMsg:
220 » » c.theySentEOF = true
221 » » c.cond.Signal()
222 » case *windowAdjustMsg:
223 » » if !c.remoteWin.add(packet.AdditionalBytes) {
224 » » » panic("illegal window update")
225 » » }
226 » default:
227 » » panic("unknown packet type")
228 » }
229 }
230
231 func (c *serverChan) handleData(data []byte) {
232 » c.cond.L.Lock()
233 » defer c.cond.L.Unlock()
234
235 » // The other side should never send us more than our window.
236 » if len(data)+c.length > len(c.pendingData) {
237 » » // TODO(agl): we should tear down the channel with a protocol
238 » » // error.
239 » » return
240 » }
241
242 » c.myWindow -= uint32(len(data))
243 » for i := 0; i < 2; i++ {
244 » » tail := c.head + c.length
245 » » if tail >= len(c.pendingData) {
246 » » » tail -= len(c.pendingData)
247 » » }
248 » » n := copy(c.pendingData[tail:], data)
249 » » data = data[n:]
250 » » c.length += n
251 » }
252
253 » c.cond.Signal()
254 }
255
256 func (c *serverChan) Stderr() io.Writer {
257 » return extendedDataChannel{c: c, t: extendedDataStderr}
258 }
259
260 // extendedDataChannel is an io.Writer that writes any data to c as extended
261 // data of the given type.
262 type extendedDataChannel struct {
263 » t extendedDataTypeCode
264 » c *serverChan
265 }
266
267 func (edc extendedDataChannel) Write(data []byte) (n int, err error) {
268 » const headerLength = 13 // 1 byte message type, 4 bytes remoteId, 4 byte s extended message type, 4 bytes data length
269 » c := edc.c
270 » for len(data) > 0 {
271 » » space := min(c.maxPacket-headerLength, len(data))
272 » » if space, err = c.getWindowSpace(space); err != nil {
273 » » » return 0, err
274 » » }
275 » » todo := data
276 » » if uint32(len(todo)) > space {
277 » » » todo = todo[:space]
278 » » }
279
280 » » packet := make([]byte, headerLength+len(todo))
281 » » packet[0] = msgChannelExtendedData
282 » » marshalUint32(packet[1:], c.remoteId)
283 » » marshalUint32(packet[5:], uint32(edc.t))
284 » » marshalUint32(packet[9:], uint32(len(todo)))
285 » » copy(packet[13:], todo)
286
287 » » if err = c.writePacket(packet); err != nil {
288 » » » return
289 » » }
290
291 » » n += len(todo)
292 » » data = data[len(todo):]
293 » }
294
295 » return
296 }
297
298 func (c *serverChan) Read(data []byte) (n int, err error) {
299 » n, err, windowAdjustment := c.read(data)
300
301 » if windowAdjustment > 0 {
302 » » packet := marshal(msgChannelWindowAdjust, windowAdjustMsg{
303 » » » PeersId: c.remoteId,
304 » » » AdditionalBytes: windowAdjustment,
305 » » })
306 » » err = c.writePacket(packet)
307 » }
308
309 » return
310 }
311
312 func (c *serverChan) read(data []byte) (n int, err error, windowAdjustment uint3 2) {
313 » c.cond.L.Lock()
314 » defer c.cond.L.Unlock()
315
316 » if c.err != nil {
317 » » return 0, c.err, 0
318 » }
319
320 » for {
321 » » if c.theySentEOF || c.theyClosed || c.dead() {
322 » » » return 0, io.EOF, 0
323 » » }
324
325 » » if len(c.pendingRequests) > 0 {
326 » » » req := c.pendingRequests[0]
327 » » » if len(c.pendingRequests) == 1 {
328 » » » » c.pendingRequests = nil
329 » » » } else {
330 » » » » oldPendingRequests := c.pendingRequests
331 » » » » c.pendingRequests = make([]ChannelRequest, len(o ldPendingRequests)-1)
332 » » » » copy(c.pendingRequests, oldPendingRequests[1:])
333 » » » }
334
335 » » » return 0, req, 0
336 » » }
337
338 » » if c.length > 0 {
339 » » » tail := min(uint32(c.head+c.length), len(c.pendingData))
340 » » » n = copy(data, c.pendingData[c.head:tail])
341 » » » c.head += n
342 » » » c.length -= n
343 » » » if c.head == len(c.pendingData) {
344 » » » » c.head = 0
345 » » » }
346
347 » » » windowAdjustment = uint32(len(c.pendingData)-c.length) - c.myWindow
348 » » » if windowAdjustment < uint32(len(c.pendingData)/2) {
349 » » » » windowAdjustment = 0
350 » » » }
351 » » » c.myWindow += windowAdjustment
352
353 » » » return
354 » » }
355
356 » » c.cond.Wait()
357 » }
358
359 » panic("unreachable")
360 }
361
362 // getWindowSpace takes, at most, max bytes of space from the peer's window. It
363 // returns the number of bytes actually reserved.
364 func (c *serverChan) getWindowSpace(max uint32) (uint32, error) {
365 » if c.dead() || c.closed() {
366 » » return 0, io.EOF
367 » }
368 return c.remoteWin.reserve(max), nil 132 return c.remoteWin.reserve(max), nil
369 } 133 }
370 134
371 func (c *serverChan) dead() bool { 135 // writePacket sends the packet over the wire. If the packet is a
372 » return atomic.LoadUint32(&c.isDead) > 0 136 // channel close, it updates sentClose. This method takes the lock
137 // c.mu.
138 func (c *channel) writePacket(packet []byte) error {
139 » if uint32(len(packet)) > c.maxPacket {
140 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(packet), c.maxPacket)
141 » }
142
143 » c.mu.Lock()
144 » if c.sentClose {
145 » » c.mu.Unlock()
146 » » return io.EOF
147 » }
148 » c.sentClose = (packet[0] == msgChannelClose)
149 » err := c.mux.conn.writePacket(packet)
150 » c.mu.Unlock()
151 » return err
373 } 152 }
374 153
375 func (c *serverChan) setDead() { 154 func (c *channel) sendMessage(code byte, msg interface{}) error {
376 » atomic.StoreUint32(&c.isDead, 1) 155 » if debug {
377 } 156 » » log.Printf("send %d: %#v", c.mux.chanList.offset, msg)
378
379 func (c *serverChan) Write(data []byte) (n int, err error) {
380 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length
381 » for len(data) > 0 {
382 » » space := min(c.maxPacket-headerLength, len(data))
383 » » if space, err = c.getWindowSpace(space); err != nil {
384 » » » return 0, err
385 » » }
386 » » todo := data
387 » » if uint32(len(todo)) > space {
388 » » » todo = todo[:space]
389 » » }
390
391 » » packet := make([]byte, headerLength+len(todo))
392 » » packet[0] = msgChannelData
393 » » marshalUint32(packet[1:], c.remoteId)
394 » » marshalUint32(packet[5:], uint32(len(todo)))
395 » » copy(packet[9:], todo)
396
397 » » if err = c.writePacket(packet); err != nil {
398 » » » return
399 » » }
400
401 » » n += len(todo)
402 » » data = data[len(todo):]
403 } 157 }
404 158
405 » return 159 » p := marshal(code, msg)
406 } 160 » binary.BigEndian.PutUint32(p[1:], c.remoteId)
407 161 » return c.writePacket(p)
408 // Close signals the intent to close the channel.
409 func (c *serverChan) Close() error {
410 » c.serverConn.lock.Lock()
411 » defer c.serverConn.lock.Unlock()
412
413 » if c.serverConn.err != nil {
414 » » return c.serverConn.err
415 » }
416
417 » if !c.setClosed() {
418 » » return errors.New("ssh: channel already closed")
419 » }
420 » return c.sendClose()
421 }
422
423 func (c *serverChan) AckRequest(ok bool) error {
424 » c.serverConn.lock.Lock()
425 » defer c.serverConn.lock.Unlock()
426
427 » if c.serverConn.err != nil {
428 » » return c.serverConn.err
429 » }
430
431 » if !ok {
432 » » ack := channelRequestFailureMsg{
433 » » » PeersId: c.remoteId,
434 » » }
435 » » return c.writePacket(marshal(msgChannelFailure, ack))
436 » }
437
438 » ack := channelRequestSuccessMsg{
439 » » PeersId: c.remoteId,
440 » }
441 » return c.writePacket(marshal(msgChannelSuccess, ack))
442 }
443
444 func (c *serverChan) ChannelType() string {
445 » return c.chanType
446 }
447
448 func (c *serverChan) ExtraData() []byte {
449 » return c.extraData
450 }
451
452 // A clientChan represents a single RFC 4254 channel multiplexed
453 // over a SSH connection.
454 type clientChan struct {
455 » channel
456 » stdin *chanWriter
457 » stdout *chanReader
458 » stderr *chanReader
459 » msg chan interface{}
460 }
461
462 // newClientChan returns a partially constructed *clientChan
463 // using the local id provided. To be usable clientChan.remoteId
464 // needs to be assigned once known.
465 func newClientChan(cc packetConn, id uint32) *clientChan {
466 » c := &clientChan{
467 » » channel: channel{
468 » » » packetConn: cc,
469 » » » localId: id,
470 » » » remoteWin: window{Cond: newCond()},
471 » » },
472 » » msg: make(chan interface{}, 16),
473 » }
474 » c.stdin = &chanWriter{
475 » » channel: &c.channel,
476 » }
477 » c.stdout = &chanReader{
478 » » channel: &c.channel,
479 » » buffer: newBuffer(),
480 » }
481 » c.stderr = &chanReader{
482 » » channel: &c.channel,
483 » » buffer: newBuffer(),
484 » }
485 » return c
486 }
487
488 // waitForChannelOpenResponse, if successful, fills out
489 // the remoteId and records any initial window advertisement.
490 func (c *clientChan) waitForChannelOpenResponse() error {
491 » switch msg := (<-c.msg).(type) {
492 » case *channelOpenConfirmMsg:
493 » » if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1< <31 {
494 » » » return errors.New("ssh: invalid MaxPacketSize from peer" )
495 » » }
496 » » // fixup remoteId field
497 » » c.remoteId = msg.MyId
498 » » c.maxPacket = msg.MaxPacketSize
499 » » c.remoteWin.add(msg.MyWindow)
500 » » return nil
501 » case *channelOpenFailureMsg:
502 » » return errors.New(safeString(msg.Message))
503 » }
504 » return errors.New("ssh: unexpected packet")
505 }
506
507 // Close signals the intent to close the channel.
508 func (c *clientChan) Close() error {
509 » if !c.setClosed() {
510 » » return errors.New("ssh: channel already closed")
511 » }
512 » c.stdout.eof()
513 » c.stderr.eof()
514 » return c.sendClose()
515 }
516
517 // A chanWriter represents the stdin of a remote process.
518 type chanWriter struct {
519 » *channel
520 » // indicates the writer has been closed. eof is owned by the
521 » // caller of Write/Close.
522 » eof bool
523 }
524
525 // Write writes data to the remote process's standard input.
526 func (w *chanWriter) Write(data []byte) (written int, err error) {
527 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length
528 » for len(data) > 0 {
529 » » if w.eof || w.closed() {
530 » » » err = io.EOF
531 » » » return
532 » » }
533 » » // never send more data than maxPacket even if
534 » » // there is sufficient window.
535 » » n := min(w.maxPacket-headerLength, len(data))
536 » » r := w.remoteWin.reserve(n)
537 » » n = r
538 » » remoteId := w.remoteId
539 » » packet := []byte{
540 » » » msgChannelData,
541 » » » byte(remoteId >> 24), byte(remoteId >> 16), byte(remoteI d >> 8), byte(remoteId),
542 » » » byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n),
543 » » }
544 » » if err = w.writePacket(append(packet, data[:n]...)); err != nil {
545 » » » break
546 » » }
547 » » data = data[n:]
548 » » written += int(n)
549 » }
550 » return
551 } 162 }
552 163
553 func min(a uint32, b int) uint32 { 164 func min(a uint32, b int) uint32 {
554 if a < uint32(b) { 165 if a < uint32(b) {
555 return a 166 return a
556 } 167 }
557 return uint32(b) 168 return uint32(b)
558 } 169 }
559 170
560 func (w *chanWriter) Close() error { 171 func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err er ror) {
561 » w.eof = true 172 » if c.sentEOF {
562 » return w.sendEOF() 173 » » return 0, io.EOF
563 } 174 » }
564 175 » // 1 byte message type, 4 bytes remoteId, 4 bytes data length
565 // A chanReader represents stdout or stderr of a remote process. 176 » opCode := byte(msgChannelData)
566 type chanReader struct { 177 » headerLength := uint32(9)
567 » *channel // the channel backing this reader 178 » if extendedCode > 0 {
568 » *buffer 179 » » headerLength += 4
569 } 180 » » opCode = msgChannelExtendedData
570 181 » }
571 // Read reads data from the remote process's stdout or stderr. 182
572 func (r *chanReader) Read(buf []byte) (int, error) { 183 » for len(data) > 0 {
573 » n, err := r.buffer.Read(buf) 184 » » space := min(c.maxPacket-headerLength, len(data))
185 » » if space, err = c.getWindowSpace(space); err != nil {
186 » » » return n, err
187 » » }
188 » » todo := data
189 » » if uint32(len(todo)) > space {
190 » » » todo = todo[:space]
191 » » }
192
193 » » packet := make([]byte, headerLength+uint32(len(todo)))
194 » » packet[0] = opCode
195 » » marshalUint32(packet[1:], c.remoteId)
196 » » if extendedCode > 0 {
197 » » » marshalUint32(packet[5:], uint32(extendedCode))
198 » » }
199 » » marshalUint32(packet[headerLength-4:], uint32(len(todo)))
200 » » copy(packet[headerLength:], todo)
201 » » if err = c.writePacket(packet); err != nil {
202 » » » return n, err
203 » » }
204
205 » » n += len(todo)
206 » » data = data[len(todo):]
207 » }
208
209 » return n, err
210 }
211
212 func (c *channel) handleData(packet []byte) error {
213 » sz := 9
214 » if packet[0] == msgChannelExtendedData {
215 » » sz = 13
216 » }
217 » if len(packet) < sz {
218 » » // malformed data packet
219 » » return ParseError{packet[0]}
220 » }
221
222 » var extended uint32
223 » if sz > 9 {
224 » » extended = binary.BigEndian.Uint32(packet[5:])
225 » }
226
227 » length := binary.BigEndian.Uint32(packet[sz-4 : sz])
228 » if length == 0 {
229 » » return nil
230 » }
231 » data := packet[sz:]
232 » if length != uint32(len(data)) {
233 » » return errors.New("ssh: wrong packet length")
234 » }
235
236 » c.mu.Lock()
237 » if c.myWindow < length {
238 » » c.mu.Unlock()
239 » » // TODO(hanwen): should send Disconnect with reason?
240 » » return errors.New("ssh: remote side wrote too much")
241 » }
242 » c.myWindow -= length
243 » c.mu.Unlock()
244
245 » if extended == 1 {
246 » » c.extPending.write(data)
247 » } else if extended > 0 {
248 » » // discard other extended data.
249 » } else {
250 » » c.pending.write(data)
251 » }
252 » return nil
253 }
254
255 func (c *channel) adjustWindow(n uint32) error {
256 » c.mu.Lock()
257 » c.myWindow += uint32(n)
258 » c.mu.Unlock()
259 » return c.sendMessage(msgChannelWindowAdjust, windowAdjustMsg{
260 » » AdditionalBytes: uint32(n),
261 » })
262 }
263
264 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
265 » if extended == 1 {
266 » » n, err = c.extPending.Read(data)
267 » } else if extended == 0 {
268 » » n, err = c.pending.Read(data)
269 » }
270
271 » if n > 0 {
272 » » err = c.adjustWindow(uint32(n))
273 » » // sendWindowAdjust can return io.EOF if the remote
274 » » // peer has closed the connection, however we want to
275 » » // defer forwarding io.EOF to the caller of Read until
276 » » // the buffer has been drained.
277 » » if n > 0 && err == io.EOF {
278 » » » err = nil
279 » » }
280 » }
281
282 » return n, err
283 }
284
285 func (c *channel) handlePacket(packet []byte) error {
286 » if uint32(len(packet)) > c.maxPacket {
287 » » // TODO(hanwen): should send Disconnect?
288 » » return errors.New("ssh: incoming packet exceeds maximum size")
289 » }
290
291 » switch packet[0] {
292 » case msgChannelData, msgChannelExtendedData:
293 » » return c.handleData(packet)
294 » case msgChannelClose:
295 » » // Ack the close.
296 » » c.sendMessage(msgChannelClose, channelCloseMsg{
297 » » » PeersId: c.remoteId})
298
299 » » c.pending.eof()
300 » » c.extPending.eof()
301 » » close(c.msg)
302 » » close(c.pendingRequests)
303 » » c.mux.chanList.remove(c.localId)
304
305 » » return nil
306 » case msgChannelEOF:
307 » » c.pending.eof()
308 » » // RFC 4254 is mute on how EOF affects dataExt messages but
309 » » // it is logical to signal EOF at the same time.
310 » » c.extPending.eof()
311 » » return nil
312 » }
313
314 » decoded, err := decode(packet)
574 if err != nil { 315 if err != nil {
575 » » if err == io.EOF { 316 » » return err
576 » » » return n, err 317 » }
577 » » } 318
578 » » return 0, err 319 » switch msg := decoded.(type) {
579 » } 320 » case *windowAdjustMsg:
580 » err = r.sendWindowAdj(n) 321 » » if !c.remoteWin.add(msg.AdditionalBytes) {
581 » if err == io.EOF && n > 0 { 322 » » » return fmt.Errorf("invalid window update %d", msg.Additi onalBytes)
582 » » // sendWindowAdjust can return io.EOF if the remote peer has 323 » » }
583 » » // closed the connection, however we want to defer forwarding io .EOF to the 324
584 » » // caller of Read until the buffer has been drained. 325 » case *channelRequestMsg:
585 » » err = nil 326 » » req := ChannelRequest{
586 » } 327 » » » Request: msg.Request,
587 » return n, err 328 » » » WantReply: msg.WantReply,
588 } 329 » » » Payload: msg.RequestSpecificData,
330 » » }
331
332 » » c.pendingRequests <- &req
333 » default:
334 » » c.msg <- msg
335 » }
336 » return nil
337 }
338
339 func newChannel(chanType string, extraData []byte) *channel {
340 » return &channel{
341 » » remoteWin: window{Cond: newCond()},
342 » » myWindow: defaultWindowSize,
343 » » pending: newBuffer(),
344 » » extPending: newBuffer(),
345 » » pendingRequests: make(chan *ChannelRequest, 16),
346 » » msg: make(chan interface{}, 16),
347 » » chanType: chanType,
348 » » extraData: extraData,
349 » }
350 }
351
352 var errUndecided = errors.New("ssh: must Accept or Reject channel")
353 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
354
355 type extChannel channel
356
357 func (e *extChannel) Write(data []byte) (n int, err error) {
358 » return ((*channel)(e)).WriteExtended(data, 1)
359 }
360 func (e *extChannel) Read(data []byte) (n int, err error) {
361 » return ((*channel)(e)).ReadExtended(data, 1)
362 }
363
364 func (c *channel) Accept() error {
365 » if c.decided {
366 » » return errDecidedAlready
367 » }
368 » confirm := channelOpenConfirmMsg{
369 » » PeersId: c.remoteId,
370 » » MyId: c.localId,
371 » » MyWindow: c.myWindow,
372 » » MaxPacketSize: c.maxPacket,
373 » }
374 » c.decided = true
375 » return c.sendMessage(msgChannelOpenConfirm, confirm)
376 }
377
378 func (ch *channel) Reject(reason RejectionReason, message string) error {
379 » if ch.decided {
380 » » return errDecidedAlready
381 » }
382 » reject := channelOpenFailureMsg{
383 » » PeersId: ch.remoteId,
384 » » Reason: reason,
385 » » Message: message,
386 » » Language: "en",
387 » }
388 » ch.decided = true
389 » return ch.sendMessage(msgChannelOpenFailure, reject)
390 }
391
392 func (ch *channel) Read(data []byte) (int, error) {
393 » if !ch.decided {
394 » » return 0, errUndecided
395 » }
396
397 » return ch.ReadExtended(data, 0)
398 }
399
400 func (ch *channel) IncomingRequests() chan *ChannelRequest {
401 » return ch.pendingRequests
402 }
403
404 func (ch *channel) Write(data []byte) (int, error) {
405 » if !ch.decided {
406 » » return 0, errUndecided
407 » }
408 » return ch.WriteExtended(data, 0)
409 }
410
411 func (ch *channel) CloseWrite() error {
412 » if !ch.decided {
413 » » return errUndecided
414 » }
415 » ch.sentEOF = true
416 » return ch.sendMessage(msgChannelEOF, channelEOFMsg{
417 » » PeersId: ch.remoteId})
418 }
419
420 func (ch *channel) Close() error {
421 » if !ch.decided {
422 » » return errUndecided
423 » }
424
425 » return ch.sendMessage(msgChannelClose, channelCloseMsg{
426 » » PeersId: ch.remoteId})
427 }
428
429 func (ch *channel) Stderr() io.ReadWriter {
430 » if !ch.decided {
431 » » return nil
432 » }
433 » return (*extChannel)(ch)
434 }
435
436 // SendRequest sends a channel request. If wantReply is set, it will
437 // wait for a reply and return the result as a boolean.
438 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (boo l, error) {
439 » if !ch.decided {
440 » » return false, errUndecided
441 » }
442
443 » if wantReply {
444 » » ch.sentRequestMu.Lock()
445 » » defer ch.sentRequestMu.Unlock()
446 » }
447
448 » msg := channelRequestMsg{
449 » » PeersId: ch.remoteId,
450 » » Request: name,
451 » » WantReply: wantReply,
452 » » RequestSpecificData: payload,
453 » }
454
455 » if err := ch.sendMessage(msgChannelRequest, msg); err != nil {
456 » » return false, err
457 » }
458
459 » if wantReply {
460 » » m, ok := (<-ch.msg)
461 » » if !ok {
462 » » » return false, io.EOF
463 » » }
464 » » switch m.(type) {
465 » » case *channelRequestFailureMsg:
466 » » » return false, nil
467 » » case *channelRequestSuccessMsg:
468 » » » return true, nil
469 » » default:
470 » » » return false, fmt.Errorf("unexpected response %#v", m)
471 » » }
472 » }
473
474 » return false, nil
475 }
476
477 // AckRequest either sends an ack or nack to the channel request.
478 func (ch *channel) AckRequest(ok bool) error {
479 » if !ch.decided {
480 » » return errUndecided
481 » }
482
483 » var msg interface{}
484 » var code byte
485 » if !ok {
486 » » code = msgChannelFailure
487 » » msg = channelRequestFailureMsg{
488 » » » PeersId: ch.remoteId,
489 » » }
490 » } else {
491 » » code = msgChannelSuccess
492 » » msg = channelRequestSuccessMsg{
493 » » » PeersId: ch.remoteId,
494 » » }
495 » }
496 » return ch.sendMessage(code, msg)
497 }
498
499 func (ch *channel) ChannelType() string {
500 » return ch.chanType
501 }
502
503 func (ch *channel) ExtraData() []byte {
504 » return ch.extraData
505 }
OLDNEW
« no previous file with comments | « no previous file | ssh/client.go » ('j') | ssh/client.go » ('J')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b