Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(685)

Delta Between Two Patch Sets: ssh/channel.go

Issue 14225043: code review 14225043: go.crypto/ssh: reimplement SSH connection protocol modu... (Closed)
Left Patch Set: diff -r 2cd6b3b93cdb https://code.google.com/p/go.crypto Created 10 years, 6 months ago
Right Patch Set: diff -r cd1eea1eb828 https://code.google.com/p/go.crypto Created 10 years, 5 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « no previous file | ssh/client.go » ('j') | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 // Copyright 2011 The Go Authors. All rights reserved. 1 // Copyright 2011 The Go Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style 2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file. 3 // license that can be found in the LICENSE file.
4 4
5 package ssh 5 package ssh
6 6
7 import ( 7 import (
8 "encoding/binary"
8 "errors" 9 "errors"
9 "fmt" 10 "fmt"
10 "io" 11 "io"
12 "log"
11 "sync" 13 "sync"
12 "sync/atomic"
13 ) 14 )
14 15
15 // extendedDataTypeCode identifies an OpenSSL extended data type. See RFC 4254, 16 // A Channel is an ordered, reliable, duplex stream that is
16 // section 5.2. 17 // multiplexed over an SSH connection.
17 type extendedDataTypeCode uint32
18
19 const (
20 » // extendedDataStderr is the extended data type that is used for stderr.
21 » extendedDataStderr extendedDataTypeCode = 1
22
23 » // minPacketLength defines the smallest valid packet
24 » minPacketLength = 9
25 )
26
27 // A Channel is an ordered, reliable, duplex stream that is multiplexed over an
28 // SSH connection. Channel.Read can return a ChannelRequest as an error.
29 type Channel interface { 18 type Channel interface {
30 // Accept accepts the channel creation request. 19 // Accept accepts the channel creation request.
31 Accept() error 20 Accept() error
32 21
33 » // Reject rejects the channel creation request. After calling this, no 22 » // Reject rejects the channel creation request. After calling
34 » // other methods on the Channel may be called. If they are then the 23 » // this, no other methods on the Channel may be called.
35 » // peer is likely to signal a protocol error and drop the connection.
36 Reject(reason RejectionReason, message string) error 24 Reject(reason RejectionReason, message string) error
37 25
26 // Read may return a ChannelRequest as an error.
38 Read(data []byte) (int, error) 27 Read(data []byte) (int, error)
39 Write(data []byte) (int, error) 28 Write(data []byte) (int, error)
29
30 // Signals end of channel use. No data may be sent after this
31 // call.
40 Close() error 32 Close() error
41 33
42 » // Stderr returns an io.ReadWriter that writes to this channel with the 34 » // Stderr returns an io.Writer that writes to this channel with the
43 // extended data type set to stderr. 35 // extended data type set to stderr.
44 » Stderr() io.ReadWriter 36 » Stderr() io.Writer
45
46 » // SendRequest sends a channel request.
47 » SendRequest(name string, wantReply bool, payload []byte) (bool, error)
48
49 » // ReceivedRequests returns the channel for out-of-band
50 » // requests. For all requests that have WantReply, a call of
51 » // AckRequest should follow.
52 » ReceivedRequests() chan *ChannelRequest
53 37
54 // AckRequest either sends an ack or nack to the channel 38 // AckRequest either sends an ack or nack to the channel
55 // request. It should only be called if the last 39 // request. It should only be called if the last
56 // ChannelRequest had a WantReply 40 // ChannelRequest had a WantReply
57 AckRequest(ok bool) error 41 AckRequest(ok bool) error
58 42
59 // ChannelType returns the type of the channel, as supplied by the 43 // ChannelType returns the type of the channel, as supplied by the
60 // client. 44 // client.
61 ChannelType() string 45 ChannelType() string
62 » // ExtraData returns the arbitary payload for this channel, as supplied 46
47 » // ExtraData returns the arbitrary payload for this channel, as supplied
63 // by the client. This data is specific to the channel type. 48 // by the client. This data is specific to the channel type.
64 ExtraData() []byte 49 ExtraData() []byte
65 } 50 }
66 51
67 // ChannelRequest represents a request sent on a channel, outside of the normal 52 // ChannelRequest represents a request sent, outside of the normal
68 // stream of bytes. It may result from calling Read on a Channel. 53 // stream of bytes.
69 type ChannelRequest struct { 54 type ChannelRequest struct {
70 Request string 55 Request string
71 WantReply bool 56 WantReply bool
72 Payload []byte 57 Payload []byte
73 } 58 }
74 59
75 func (c ChannelRequest) Error() string { 60 func (c ChannelRequest) Error() string {
76 return "ssh: channel request received" 61 return "ssh: channel request received"
77 } 62 }
78 63
79 // RejectionReason is an enumeration used when rejecting channel creation 64 // RejectionReason is an enumeration used when rejecting channel creation
80 // requests. See RFC 4254, section 5.1. 65 // requests. See RFC 4254, section 5.1.
81 type RejectionReason uint32 66 type RejectionReason uint32
82 67
83 const ( 68 const (
84 Prohibited RejectionReason = iota + 1 69 Prohibited RejectionReason = iota + 1
85 ConnectionFailed 70 ConnectionFailed
86 UnknownChannelType 71 UnknownChannelType
87 ResourceShortage 72 ResourceShortage
88 ) 73 )
89 74
75 // String converts the rejection reason to human readable form.
90 func (r RejectionReason) String() string { 76 func (r RejectionReason) String() string {
91 switch r { 77 switch r {
92 case Prohibited: 78 case Prohibited:
93 » » return "Prohibited" 79 » » return "administratively prohibited"
94 case ConnectionFailed: 80 case ConnectionFailed:
95 » » return "ConnectionFailed" 81 » » return "connect failed"
96 case UnknownChannelType: 82 case UnknownChannelType:
97 » » return "UnknownChannelType" 83 » » return "unknown channel type"
98 case ResourceShortage: 84 case ResourceShortage:
99 » » return "ResourceShortage" 85 » » return "resource shortage"
100 } 86 }
101 return fmt.Sprintf("unknown reason %d", int(r)) 87 return fmt.Sprintf("unknown reason %d", int(r))
102 } 88 }
103 89
90 // channel implements the Channel
104 type channel struct { 91 type channel struct {
105 » packetConn packetConn // the underlying transport 92 » // R/O after creation
93 » chanType string
94 » extraData []byte
106 localId, remoteId uint32 95 localId, remoteId uint32
107 remoteWin window
108 maxPacket uint32 96 maxPacket uint32
109 » isClosed uint32 // atomic bool, non zero if true 97 » mux *mux
110 } 98
111 99 » // If set, we have called Accept or Reject on this channel
112 func (c *channel) sendWindowAdj(n int) error { 100 » decided bool
113 » msg := windowAdjustMsg{ 101
114 » » PeersId: c.remoteId, 102 » // Pending internal channel messages.
103 » msg chan interface{}
104
105 » // Pending user-serviceable messages.
106 » sentRequestMu sync.Mutex
107
108 » incomingRequests chan *ChannelRequest
109
110 » sentEOF bool
111
112 » // thread-safe data
113 » remoteWin window
114 » pending *buffer
115 » extPending *buffer
116
117 » // Protects all of the below.
118 » mu sync.Mutex
119 » myWindow uint32
120 » sentClose bool
121 }
122
123 func (c *channel) getWindowSpace(max uint32) (uint32, error) {
124 » // check if closed?
125 » return c.remoteWin.reserve(max), nil
126 }
127
128 // writePacket sends the packet over the wire. If the packet is a
129 // channel close, it updates sentClose. This method takes the lock
130 // c.mu.
131 func (c *channel) writePacket(packet []byte) error {
132 » if uint32(len(packet)) > c.maxPacket {
133 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(packet), c.maxPacket)
134 » }
135
136 » c.mu.Lock()
137 » if c.sentClose {
138 » » c.mu.Unlock()
139 » » return io.EOF
140 » }
141 » c.sentClose = (packet[0] == msgChannelClose)
142 » err := c.mux.conn.writePacket(packet)
143 » c.mu.Unlock()
144 » return err
145 }
146
147 func (c *channel) sendMessage(code byte, msg interface{}) error {
148 » if debug {
149 » » log.Printf("send %d: %#v", c.mux.chanList.offset, msg)
150 » }
151
152 » p := marshal(code, msg)
153 » binary.BigEndian.PutUint32(p[1:], c.remoteId)
154 » return c.writePacket(p)
155 }
156
157 func min(a uint32, b int) uint32 {
158 » if a < uint32(b) {
159 » » return a
160 » }
161 » return uint32(b)
162 }
163
164 func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err er ror) {
165 » if c.sentEOF {
166 » » return 0, io.EOF
167 » }
168 » // 1 byte message type, 4 bytes remoteId, 4 bytes data length
169 » opCode := byte(msgChannelData)
170 » headerLength := uint32(9)
171 » if extendedCode > 0 {
172 » » headerLength += 4
173 » » opCode = msgChannelExtendedData
174 » }
175
176 » for len(data) > 0 {
177 » » space := min(c.maxPacket-headerLength, len(data))
178 » » if space, err = c.getWindowSpace(space); err != nil {
179 » » » return n, err
180 » » }
181 » » todo := data
182 » » if uint32(len(todo)) > space {
183 » » » todo = todo[:space]
184 » » }
185
186 » » packet := make([]byte, headerLength+uint32(len(todo)))
187 » » packet[0] = opCode
188 » » marshalUint32(packet[1:], c.remoteId)
189 » » if extendedCode > 0 {
190 » » » marshalUint32(packet[5:], uint32(extendedCode))
191 » » }
192 » » marshalUint32(packet[headerLength-4:], uint32(len(todo)))
193 » » copy(packet[headerLength:], todo)
194 » » if err = c.writePacket(packet); err != nil {
195 » » » return n, err
196 » » }
197
198 » » n += len(todo)
199 » » data = data[len(todo):]
200 » }
201
202 » return n, err
203 }
204
205 func (c *channel) handleData(packet []byte) error {
206 » sz := 9
207 » if packet[0] == msgChannelExtendedData {
208 » » sz = 13
209 » }
210 » if len(packet) < sz {
211 » » // malformed data packet
212 » » return ParseError{packet[0]}
213 » }
214
215 » var extended uint32
216 » if sz > 9 {
217 » » extended = binary.BigEndian.Uint32(packet[5:])
218 » }
219
220 » length := binary.BigEndian.Uint32(packet[sz-4 : sz])
221 » if length == 0 {
222 » » return nil
223 » }
224 » data := packet[sz:]
225 » if length != uint32(len(data)) {
226 » » return errors.New("ssh: wrong packet length")
227 » }
228
229 » c.mu.Lock()
230 » if c.myWindow < length {
231 » » c.mu.Unlock()
232 » » // TODO(hanwen): should send Disconnect with reason?
233 » » return errors.New("ssh: remote side wrote too much")
234 » }
235 » c.myWindow -= length
236 » c.mu.Unlock()
237
238 » if extended == 1 {
239 » » c.extPending.write(data)
240 » } else if extended > 0 {
241 » » // discard other extended data.
242 » } else {
243 » » c.pending.write(data)
244 » }
245 » return nil
246 }
247
248 func (c *channel) adjustWindow(n uint32) error {
249 » c.mu.Lock()
250 » c.myWindow += uint32(n)
251 » c.mu.Unlock()
252 » return c.sendMessage(msgChannelWindowAdjust, windowAdjustMsg{
115 AdditionalBytes: uint32(n), 253 AdditionalBytes: uint32(n),
116 » } 254 » })
117 » return c.writePacket(marshal(msgChannelWindowAdjust, msg)) 255 }
118 } 256
119 257 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) {
120 // sendEOF sends EOF to the remote side. RFC 4254 Section 5.3 258 » if extended == 1 {
121 func (c *channel) sendEOF() error { 259 » » n, err = c.extPending.Read(data)
122 » return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{ 260 » } else if extended == 0 {
123 » » PeersId: c.remoteId, 261 » » n, err = c.pending.Read(data)
124 » })) 262 » } else {
125 } 263 » » return 0, fmt.Errorf("ssh: extended code %d implemented", extend ed)
126 264 » }
127 // sendClose informs the remote side of our intent to close the channel. 265
128 func (c *channel) sendClose() error { 266 » if n > 0 {
129 » return c.packetConn.writePacket(marshal(msgChannelClose, channelCloseMsg { 267 » » err = c.adjustWindow(uint32(n))
130 » » PeersId: c.remoteId, 268 » » // sendWindowAdjust can return io.EOF if the remote
131 » })) 269 » » // peer has closed the connection, however we want to
132 } 270 » » // defer forwarding io.EOF to the caller of Read until
133 271 » » // the buffer has been drained.
134 func (c *channel) sendChannelOpenFailure(reason RejectionReason, message string) error { 272 » » if n > 0 && err == io.EOF {
135 » reject := channelOpenFailureMsg{ 273 » » » err = nil
136 » » PeersId: c.remoteId, 274 » » }
137 » » Reason: reason, 275 » }
138 » » Message: message, 276
139 » » Language: "en", 277 » return n, err
140 » } 278 }
141 » return c.writePacket(marshal(msgChannelOpenFailure, reject)) 279
142 } 280 func (c *channel) handlePacket(packet []byte) error {
143 281 » if uint32(len(packet)) > c.maxPacket {
144 func (c *channel) writePacket(b []byte) error { 282 » » // TODO(hanwen): should send Disconnect?
145 » if c.closed() { 283 » » return errors.New("ssh: incoming packet exceeds maximum size")
146 » » return io.EOF 284 » }
147 » } 285
148 » if uint32(len(b)) > c.maxPacket { 286 » switch packet[0] {
149 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(b), c.maxPacket) 287 » case msgChannelData, msgChannelExtendedData:
150 » } 288 » » return c.handleData(packet)
151 » return c.packetConn.writePacket(b) 289 » case msgChannelClose:
152 } 290 » » // Ack the close.
153 291 » » c.sendMessage(msgChannelClose, channelCloseMsg{
154 func (c *channel) closed() bool { 292 » » » PeersId: c.remoteId})
155 » return atomic.LoadUint32(&c.isClosed) > 0 293
156 } 294 » » c.pending.eof()
157 295 » » c.extPending.eof()
158 func (c *channel) setClosed() bool { 296 » » close(c.msg)
159 » return atomic.CompareAndSwapUint32(&c.isClosed, 0, 1) 297 » » close(c.incomingRequests)
160 } 298 » » c.mux.chanList.remove(c.localId)
161 299
162 type serverChan struct { 300 » » return nil
163 » channel 301 » case msgChannelEOF:
164 » // immutable once created 302 » » // RFC 4254 is mute on how EOF affects dataExt messages but
165 » chanType string 303 » » // it is logical to signal EOF at the same time.
166 » extraData []byte 304 » » c.extPending.eof()
167 305
168 » serverConn *ServerConn 306 » » // For ServerConn, ChannelRequests are actually output
169 » myWindow uint32 307 » » // as Read error. This means that no requests can be
170 » theyClosed bool // indicates the close msg has been received from the r emote side 308 » » // processed after EOF is sent, which is a bug
171 » theySentEOF bool 309 » » c.pending.eof()
172 » isDead uint32 310 » » return nil
173 » err error 311 » }
174 312
175 » pendingRequests []ChannelRequest 313 » decoded, err := decode(packet)
176 » pendingData []byte 314 » if err != nil {
177 » head, length int 315 » » return err
178 316 » }
179 » // This lock is inferior to serverConn.lock 317
180 » cond *sync.Cond 318 » switch msg := decoded.(type) {
181 } 319 » case *windowAdjustMsg:
182 320 » » if !c.remoteWin.add(msg.AdditionalBytes) {
183 func (c *serverChan) ReceivedRequests() chan *ChannelRequest { 321 » » » return fmt.Errorf("invalid window update %d", msg.Additi onalBytes)
184 » panic("not implemented") 322 » » }
323
324 » case *channelRequestMsg:
325 » » req := ChannelRequest{
326 » » » Request: msg.Request,
327 » » » WantReply: msg.WantReply,
328 » » » Payload: msg.RequestSpecificData,
329 » » }
330
331 » » c.incomingRequests <- &req
332 » default:
333 » » c.msg <- msg
334 » }
185 return nil 335 return nil
186 } 336 }
187 337
188 func (c *serverChan) Accept() error { 338 func newChannel(chanType string, extraData []byte) *channel {
189 » c.serverConn.lock.Lock() 339 » return &channel{
190 » defer c.serverConn.lock.Unlock() 340 » » remoteWin: window{Cond: newCond()},
191 341 » » myWindow: defaultWindowSize,
192 » if c.serverConn.err != nil { 342 » » pending: newBuffer(),
193 » » return c.serverConn.err 343 » » extPending: newBuffer(),
194 » } 344 » » incomingRequests: make(chan *ChannelRequest, 16),
195 345 » » msg: make(chan interface{}, 16),
346 » » chanType: chanType,
347 » » extraData: extraData,
348 » }
349 }
350
351 var errUndecided = errors.New("ssh: must Accept or Reject channel")
352 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once")
353
354 type extChannel struct {
355 » code uint32
356 » ch *channel
357 }
358
359 func (e *extChannel) Write(data []byte) (n int, err error) {
360 » return e.ch.WriteExtended(data, e.code)
361 }
362 func (e *extChannel) Read(data []byte) (n int, err error) {
363 » return e.ch.ReadExtended(data, e.code)
364 }
365
366 func (c *channel) Accept() error {
367 » if c.decided {
368 » » return errDecidedAlready
369 » }
196 confirm := channelOpenConfirmMsg{ 370 confirm := channelOpenConfirmMsg{
197 PeersId: c.remoteId, 371 PeersId: c.remoteId,
198 MyId: c.localId, 372 MyId: c.localId,
199 MyWindow: c.myWindow, 373 MyWindow: c.myWindow,
200 MaxPacketSize: c.maxPacket, 374 MaxPacketSize: c.maxPacket,
201 } 375 }
202 » return c.writePacket(marshal(msgChannelOpenConfirm, confirm)) 376 » c.decided = true
203 } 377 » if err := c.sendMessage(msgChannelOpenConfirm, confirm); err != nil {
204 378 » » return err
205 func (c *serverChan) Reject(reason RejectionReason, message string) error { 379 » }
206 » c.serverConn.lock.Lock() 380
207 » defer c.serverConn.lock.Unlock() 381 » return nil
208 382 }
209 » if c.serverConn.err != nil { 383
210 » » return c.serverConn.err 384 func (ch *channel) Reject(reason RejectionReason, message string) error {
211 » } 385 » if ch.decided {
212 386 » » return errDecidedAlready
213 » return c.sendChannelOpenFailure(reason, message) 387 » }
214 } 388 » reject := channelOpenFailureMsg{
215 389 » » PeersId: ch.remoteId,
216 func (c *serverChan) SendRequest(name string, wantReply bool, data []byte) (bool , error) { 390 » » Reason: reason,
217 » req := &channelRequestMsg{ 391 » » Message: message,
218 » » PeersId: c.remoteId, 392 » » Language: "en",
393 » }
394 » ch.decided = true
395 » return ch.sendMessage(msgChannelOpenFailure, reject)
396 }
397
398 func (ch *channel) Read(data []byte) (int, error) {
399 » if !ch.decided {
400 » » return 0, errUndecided
401 » }
402
403 » return ch.ReadExtended(data, 0)
404 }
405
406 func (ch *channel) Write(data []byte) (int, error) {
407 » if !ch.decided {
408 » » return 0, errUndecided
409 » }
410 » return ch.WriteExtended(data, 0)
411 }
412
413 func (ch *channel) CloseWrite() error {
414 » if !ch.decided {
415 » » return errUndecided
416 » }
417 » ch.sentEOF = true
418 » return ch.sendMessage(msgChannelEOF, channelEOFMsg{
419 » » PeersId: ch.remoteId})
420 }
421
422 func (ch *channel) Close() error {
423 » if !ch.decided {
424 » » return errUndecided
425 » }
426
427 » return ch.sendMessage(msgChannelClose, channelCloseMsg{
428 » » PeersId: ch.remoteId})
429 }
430
431 func (ch *channel) Extended(code uint32) io.ReadWriter {
432 » if !ch.decided {
433 » » return nil
434 » }
435 » return &extChannel{code, ch}
436 }
437
438 // SendRequest sends a channel request. If wantReply is set, it will
439 // wait for a reply and return the result as a boolean.
440 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (boo l, error) {
441 » if !ch.decided {
442 » » return false, errUndecided
443 » }
444
445 » if wantReply {
446 » » ch.sentRequestMu.Lock()
447 » » defer ch.sentRequestMu.Unlock()
448 » }
449
450 » msg := channelRequestMsg{
451 » » PeersId: ch.remoteId,
219 Request: name, 452 Request: name,
220 WantReply: wantReply, 453 WantReply: wantReply,
221 » » RequestSpecificData: data, 454 » » RequestSpecificData: payload,
222 » } 455 » }
223 » if req.WantReply { 456
224 » » panic("not implemented") 457 » if err := ch.sendMessage(msgChannelRequest, msg); err != nil {
225 » } 458 » » return false, err
226 » err := c.writePacket(marshal(msgChannelRequest, req)) 459 » }
227 » return false, err 460
228 } 461 » if wantReply {
229 462 » » m, ok := (<-ch.msg)
230 func (c *serverChan) handlePacket(packet interface{}) { 463 » » if !ok {
231 » c.cond.L.Lock() 464 » » » return false, io.EOF
232 » defer c.cond.L.Unlock() 465 » » }
233 466 » » switch m.(type) {
234 » switch packet := packet.(type) { 467 » » case *channelRequestFailureMsg:
235 » case *channelRequestMsg: 468 » » » return false, nil
236 » » req := ChannelRequest{ 469 » » case *channelRequestSuccessMsg:
237 » » » Request: packet.Request, 470 » » » return true, nil
238 » » » WantReply: packet.WantReply, 471 » » default:
239 » » » Payload: packet.RequestSpecificData, 472 » » » return false, fmt.Errorf("unexpected response %#v", m)
240 » » } 473 » » }
241 474 » }
242 » » c.pendingRequests = append(c.pendingRequests, req) 475
243 » » c.cond.Signal() 476 » return false, nil
244 » case *channelCloseMsg: 477 }
245 » » c.theyClosed = true 478
246 » » c.cond.Signal() 479 // AckRequest either sends an ack or nack to the channel request.
247 » case *channelEOFMsg: 480 func (ch *channel) AckRequest(ok bool) error {
248 » » c.theySentEOF = true 481 » if !ch.decided {
249 » » c.cond.Signal() 482 » » return errUndecided
250 » case *windowAdjustMsg: 483 » }
251 » » if !c.remoteWin.add(packet.AdditionalBytes) { 484
252 » » » panic("illegal window update") 485 » var msg interface{}
253 » » } 486 » var code byte
254 » default:
255 » » panic("unknown packet type")
256 » }
257 }
258
259 func (c *serverChan) handleData(data []byte) {
260 » c.cond.L.Lock()
261 » defer c.cond.L.Unlock()
262
263 » // The other side should never send us more than our window.
264 » if len(data)+c.length > len(c.pendingData) {
265 » » // TODO(agl): we should tear down the channel with a protocol
266 » » // error.
267 » » return
268 » }
269
270 » c.myWindow -= uint32(len(data))
271 » for i := 0; i < 2; i++ {
272 » » tail := c.head + c.length
273 » » if tail >= len(c.pendingData) {
274 » » » tail -= len(c.pendingData)
275 » » }
276 » » n := copy(c.pendingData[tail:], data)
277 » » data = data[n:]
278 » » c.length += n
279 » }
280
281 » c.cond.Signal()
282 }
283
284 func (c *serverChan) Stderr() io.ReadWriter {
285 » return extendedDataChannel{c: c, t: extendedDataStderr}
286 }
287
288 // extendedDataChannel is an io.Writer that writes any data to c as extended
289 // data of the given type.
290 type extendedDataChannel struct {
291 » t extendedDataTypeCode
292 » c *serverChan
293 }
294
295 func (edc extendedDataChannel) Read(data []byte) (n int, err error) {
296 » panic("unimplemented")
297 » return 0, nil
298 }
299
300 func (edc extendedDataChannel) Write(data []byte) (n int, err error) {
301 » const headerLength = 13 // 1 byte message type, 4 bytes remoteId, 4 byte s extended message type, 4 bytes data length
302 » c := edc.c
303 » for len(data) > 0 {
304 » » space := min(c.maxPacket-headerLength, len(data))
305 » » if space, err = c.getWindowSpace(space); err != nil {
306 » » » return 0, err
307 » » }
308 » » todo := data
309 » » if uint32(len(todo)) > space {
310 » » » todo = todo[:space]
311 » » }
312
313 » » packet := make([]byte, headerLength+len(todo))
314 » » packet[0] = msgChannelExtendedData
315 » » marshalUint32(packet[1:], c.remoteId)
316 » » marshalUint32(packet[5:], uint32(edc.t))
317 » » marshalUint32(packet[9:], uint32(len(todo)))
318 » » copy(packet[13:], todo)
319
320 » » if err = c.writePacket(packet); err != nil {
321 » » » return
322 » » }
323
324 » » n += len(todo)
325 » » data = data[len(todo):]
326 » }
327
328 » return
329 }
330
331 func (c *serverChan) Read(data []byte) (n int, err error) {
332 » n, err, windowAdjustment := c.read(data)
333
334 » if windowAdjustment > 0 {
335 » » packet := marshal(msgChannelWindowAdjust, windowAdjustMsg{
336 » » » PeersId: c.remoteId,
337 » » » AdditionalBytes: windowAdjustment,
338 » » })
339 » » err = c.writePacket(packet)
340 » }
341
342 » return
343 }
344
345 func (c *serverChan) read(data []byte) (n int, err error, windowAdjustment uint3 2) {
346 » c.cond.L.Lock()
347 » defer c.cond.L.Unlock()
348
349 » if c.err != nil {
350 » » return 0, c.err, 0
351 » }
352
353 » for {
354 » » if c.theySentEOF || c.theyClosed || c.dead() {
355 » » » return 0, io.EOF, 0
356 » » }
357
358 » » if len(c.pendingRequests) > 0 {
359 » » » req := c.pendingRequests[0]
360 » » » if len(c.pendingRequests) == 1 {
361 » » » » c.pendingRequests = nil
362 » » » } else {
363 » » » » oldPendingRequests := c.pendingRequests
364 » » » » c.pendingRequests = make([]ChannelRequest, len(o ldPendingRequests)-1)
365 » » » » copy(c.pendingRequests, oldPendingRequests[1:])
366 » » » }
367
368 » » » return 0, req, 0
369 » » }
370
371 » » if c.length > 0 {
372 » » » tail := min(uint32(c.head+c.length), len(c.pendingData))
373 » » » n = copy(data, c.pendingData[c.head:tail])
374 » » » c.head += n
375 » » » c.length -= n
376 » » » if c.head == len(c.pendingData) {
377 » » » » c.head = 0
378 » » » }
379
380 » » » windowAdjustment = uint32(len(c.pendingData)-c.length) - c.myWindow
381 » » » if windowAdjustment < uint32(len(c.pendingData)/2) {
382 » » » » windowAdjustment = 0
383 » » » }
384 » » » c.myWindow += windowAdjustment
385
386 » » » return
387 » » }
388
389 » » c.cond.Wait()
390 » }
391
392 » panic("unreachable")
393 }
394
395 // getWindowSpace takes, at most, max bytes of space from the peer's window. It
396 // returns the number of bytes actually reserved.
397 func (c *serverChan) getWindowSpace(max uint32) (uint32, error) {
398 » if c.dead() || c.closed() {
399 » » return 0, io.EOF
400 » }
401 » return c.remoteWin.reserve(max), nil
402 }
403
404 func (c *serverChan) dead() bool {
405 » return atomic.LoadUint32(&c.isDead) > 0
406 }
407
408 func (c *serverChan) setDead() {
409 » atomic.StoreUint32(&c.isDead, 1)
410 }
411
412 func (c *serverChan) Write(data []byte) (n int, err error) {
413 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length
414 » for len(data) > 0 {
415 » » space := min(c.maxPacket-headerLength, len(data))
416 » » if space, err = c.getWindowSpace(space); err != nil {
417 » » » return 0, err
418 » » }
419 » » todo := data
420 » » if uint32(len(todo)) > space {
421 » » » todo = todo[:space]
422 » » }
423
424 » » packet := make([]byte, headerLength+len(todo))
425 » » packet[0] = msgChannelData
426 » » marshalUint32(packet[1:], c.remoteId)
427 » » marshalUint32(packet[5:], uint32(len(todo)))
428 » » copy(packet[9:], todo)
429
430 » » if err = c.writePacket(packet); err != nil {
431 » » » return
432 » » }
433
434 » » n += len(todo)
435 » » data = data[len(todo):]
436 » }
437
438 » return
439 }
440
441 // Close signals the intent to close the channel.
442 func (c *serverChan) Close() error {
443 » c.serverConn.lock.Lock()
444 » defer c.serverConn.lock.Unlock()
445
446 » if c.serverConn.err != nil {
447 » » return c.serverConn.err
448 » }
449
450 » if !c.setClosed() {
451 » » return errors.New("ssh: channel already closed")
452 » }
453 » return c.sendClose()
454 }
455
456 func (c *serverChan) AckRequest(ok bool) error {
457 » c.serverConn.lock.Lock()
458 » defer c.serverConn.lock.Unlock()
459
460 » if c.serverConn.err != nil {
461 » » return c.serverConn.err
462 » }
463
464 if !ok { 487 if !ok {
465 » » ack := channelRequestFailureMsg{ 488 » » code = msgChannelFailure
466 » » » PeersId: c.remoteId, 489 » » msg = channelRequestFailureMsg{
467 » » } 490 » » » PeersId: ch.remoteId,
468 » » return c.writePacket(marshal(msgChannelFailure, ack)) 491 » » }
469 » } 492 » } else {
470 493 » » code = msgChannelSuccess
471 » ack := channelRequestSuccessMsg{ 494 » » msg = channelRequestSuccessMsg{
472 » » PeersId: c.remoteId, 495 » » » PeersId: ch.remoteId,
473 » } 496 » » }
474 » return c.writePacket(marshal(msgChannelSuccess, ack)) 497 » }
475 } 498 » return ch.sendMessage(code, msg)
476 499 }
477 func (c *serverChan) ChannelType() string { 500
478 » return c.chanType 501 func (ch *channel) ChannelType() string {
479 } 502 » return ch.chanType
480 503 }
481 func (c *serverChan) ExtraData() []byte { 504
482 » return c.extraData 505 func (ch *channel) ExtraData() []byte {
483 } 506 » return ch.extraData
484 507 }
485 // A clientChan represents a single RFC 4254 channel multiplexed 508
486 // over a SSH connection. 509 // compatChannel is a hack to implement Channel's
487 type clientChan struct { 510 // handing of channel requests.
488 » channel 511 type compatChannel struct {
489 » stdin *chanWriter 512 » *channel
490 » stdout *chanReader 513 }
491 » stderr *chanReader 514
492 » msg chan interface{} 515 func newCompatChannel(ch *channel) *compatChannel {
493 } 516 » c := &compatChannel{ch}
494 517 » go c.loop()
495 // newClientChan returns a partially constructed *clientChan
496 // using the local id provided. To be usable clientChan.remoteId
497 // needs to be assigned once known.
498 func newClientChan(cc packetConn, id uint32) *clientChan {
499 » c := &clientChan{
500 » » channel: channel{
501 » » » packetConn: cc,
502 » » » localId: id,
503 » » » remoteWin: window{Cond: newCond()},
504 » » },
505 » » msg: make(chan interface{}, 16),
506 » }
507 » c.stdin = &chanWriter{
508 » » channel: &c.channel,
509 » }
510 » c.stdout = &chanReader{
511 » » channel: &c.channel,
512 » » buffer: newBuffer(),
513 » }
514 » c.stderr = &chanReader{
515 » » channel: &c.channel,
516 » » buffer: newBuffer(),
517 » }
518 return c 518 return c
519 } 519 }
520 520
521 // waitForChannelOpenResponse, if successful, fills out 521 func (c *compatChannel) loop() {
522 // the remoteId and records any initial window advertisement. 522 » for r := range c.channel.incomingRequests {
523 func (c *clientChan) waitForChannelOpenResponse() error { 523 » » c.channel.pending.addRequest(r)
524 » switch msg := (<-c.msg).(type) { 524 » }
525 » case *channelOpenConfirmMsg: 525 }
526 » » if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1< <31 { 526
527 » » » return errors.New("ssh: invalid MaxPacketSize from peer" ) 527 func (c *compatChannel) Stderr() io.Writer {
528 » » } 528 » return c.Extended(1)
529 » » // fixup remoteId field 529 }
530 » » c.remoteId = msg.MyId
531 » » c.maxPacket = msg.MaxPacketSize
532 » » c.remoteWin.add(msg.MyWindow)
533 » » return nil
534 » case *channelOpenFailureMsg:
535 » » return errors.New(safeString(msg.Message))
536 » }
537 » return errors.New("ssh: unexpected packet")
538 }
539
540 // Close signals the intent to close the channel.
541 func (c *clientChan) Close() error {
542 » if !c.setClosed() {
543 » » return errors.New("ssh: channel already closed")
544 » }
545 » c.stdout.eof()
546 » c.stderr.eof()
547 » return c.sendClose()
548 }
549
550 // A chanWriter represents the stdin of a remote process.
551 type chanWriter struct {
552 » *channel
553 » // indicates the writer has been closed. eof is owned by the
554 » // caller of Write/Close.
555 » eof bool
556 }
557
558 // Write writes data to the remote process's standard input.
559 func (w *chanWriter) Write(data []byte) (written int, err error) {
560 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length
561 » for len(data) > 0 {
562 » » if w.eof || w.closed() {
563 » » » err = io.EOF
564 » » » return
565 » » }
566 » » // never send more data than maxPacket even if
567 » » // there is sufficent window.
568 » » n := min(w.maxPacket-headerLength, len(data))
569 » » r := w.remoteWin.reserve(n)
570 » » n = r
571 » » remoteId := w.remoteId
572 » » packet := []byte{
573 » » » msgChannelData,
574 » » » byte(remoteId >> 24), byte(remoteId >> 16), byte(remoteI d >> 8), byte(remoteId),
575 » » » byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n),
576 » » }
577 » » if err = w.writePacket(append(packet, data[:n]...)); err != nil {
578 » » » break
579 » » }
580 » » data = data[n:]
581 » » written += int(n)
582 » }
583 » return
584 }
585
586 func min(a uint32, b int) uint32 {
587 » if a < uint32(b) {
588 » » return a
589 » }
590 » return uint32(b)
591 }
592
593 func (w *chanWriter) Close() error {
594 » w.eof = true
595 » return w.sendEOF()
596 }
597
598 // A chanReader represents stdout or stderr of a remote process.
599 type chanReader struct {
600 » *channel // the channel backing this reader
601 » *buffer
602 }
603
604 // Read reads data from the remote process's stdout or stderr.
605 func (r *chanReader) Read(buf []byte) (int, error) {
606 » n, err := r.buffer.Read(buf)
607 » if err != nil {
608 » » if err == io.EOF {
609 » » » return n, err
610 » » }
611 » » return 0, err
612 » }
613 » err = r.sendWindowAdj(n)
614 » if err == io.EOF && n > 0 {
615 » » // sendWindowAdjust can return io.EOF if the remote peer has
616 » » // closed the connection, however we want to defer forwarding io .EOF to the
617 » » // caller of Read until the buffer has been drained.
618 » » err = nil
619 » }
620 » return n, err
621 }
LEFTRIGHT
« no previous file | ssh/client.go » ('j') | Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Toggle Comments ('s')

Powered by Google App Engine
RSS Feeds Recent Issues | This issue
This is Rietveld f62528b