Left: | ||
Right: |
OLD | NEW |
---|---|
1 // Copyright 2011 The Go Authors. All rights reserved. | 1 // Copyright 2011 The Go Authors. All rights reserved. |
2 // Use of this source code is governed by a BSD-style | 2 // Use of this source code is governed by a BSD-style |
3 // license that can be found in the LICENSE file. | 3 // license that can be found in the LICENSE file. |
4 | 4 |
5 package ssh | 5 package ssh |
6 | 6 |
7 import ( | 7 import ( |
8 "encoding/binary" | |
8 "errors" | 9 "errors" |
9 "fmt" | 10 "fmt" |
10 "io" | 11 "io" |
12 "log" | |
11 "sync" | 13 "sync" |
12 "sync/atomic" | |
13 ) | 14 ) |
14 | 15 |
15 // extendedDataTypeCode identifies an OpenSSL extended data type. See RFC 4254, | 16 // A Channel is an ordered, reliable, duplex stream that is |
16 // section 5.2. | 17 // multiplexed over an SSH connection. |
17 type extendedDataTypeCode uint32 | |
18 | |
19 const ( | |
20 » // extendedDataStderr is the extended data type that is used for stderr. | |
21 » extendedDataStderr extendedDataTypeCode = 1 | |
22 | |
23 » // minPacketLength defines the smallest valid packet | |
24 » minPacketLength = 9 | |
25 ) | |
26 | |
27 // A Channel is an ordered, reliable, duplex stream that is multiplexed over an | |
28 // SSH connection. Channel.Read can return a ChannelRequest as an error. | |
29 type Channel interface { | 18 type Channel interface { |
jpsugar
2013/10/17 00:51:02
I'm not sure I'm a fan of this model where both na
hanwen-google
2013/10/17 14:59:08
It minimizes API breakage, but I agree your sugges
| |
30 // Accept accepts the channel creation request. | 19 // Accept accepts the channel creation request. |
31 Accept() error | 20 Accept() error |
32 » // Reject rejects the channel creation request. After calling this, no | 21 |
33 » // other methods on the Channel may be called. If they are then the | 22 » // Reject rejects the channel creation request. After calling |
34 » // peer is likely to signal a protocol error and drop the connection. | 23 » // this, no other methods on the Channel may be called. |
35 Reject(reason RejectionReason, message string) error | 24 Reject(reason RejectionReason, message string) error |
36 | 25 |
37 » // Read may return a ChannelRequest as an error. | 26 » // NOSUBMIT - should add ReadExtended/WriteExtended here? |
27 | |
38 Read(data []byte) (int, error) | 28 Read(data []byte) (int, error) |
39 Write(data []byte) (int, error) | 29 Write(data []byte) (int, error) |
30 | |
31 // Sends EOF to the other side. | |
32 CloseWrite() error | |
33 | |
34 // Signals end of channel use. No data may be sent after this | |
35 // call. | |
40 Close() error | 36 Close() error |
41 | 37 |
42 » // Stderr returns an io.Writer that writes to this channel with the | 38 » // Stderr returns an io.ReadWriter that writes to this channel with the |
jpsugar
2013/10/17 00:51:02
This comment only talks about writing. It would be
hanwen-google
2013/10/17 14:59:08
Done.
| |
43 // extended data type set to stderr. | 39 // extended data type set to stderr. |
44 » Stderr() io.Writer | 40 » Stderr() io.ReadWriter |
45 | 41 |
46 » // AckRequest either sends an ack or nack to the channel request. | 42 » // SendRequest sends a channel request. |
43 » SendRequest(name string, wantReply bool, payload []byte) (bool, error) | |
44 | |
45 » // IncomingRequests returns the channel for out-of-band | |
46 » // requests. For all requests received that have WantReply, a | |
47 » // call of AckRequest should follow. | |
48 » IncomingRequests() chan *ChannelRequest | |
49 | |
50 » // AckRequest either sends an ack or nack to the channel | |
51 » // request. It should only be called if the last | |
52 » // ChannelRequest had a WantReply | |
47 AckRequest(ok bool) error | 53 AckRequest(ok bool) error |
48 | 54 |
49 // ChannelType returns the type of the channel, as supplied by the | 55 // ChannelType returns the type of the channel, as supplied by the |
50 // client. | 56 // client. |
51 ChannelType() string | 57 ChannelType() string |
58 | |
52 // ExtraData returns the arbitrary payload for this channel, as supplied | 59 // ExtraData returns the arbitrary payload for this channel, as supplied |
53 // by the client. This data is specific to the channel type. | 60 // by the client. This data is specific to the channel type. |
54 ExtraData() []byte | 61 ExtraData() []byte |
55 } | 62 } |
56 | 63 |
57 // ChannelRequest represents a request sent on a channel, outside of the normal | 64 // ChannelRequest represents a request sent, outside of the normal |
58 // stream of bytes. It may result from calling Read on a Channel. | 65 // stream of bytes. Requests may either be channel specific, or global |
59 type ChannelRequest struct { | 66 type ChannelRequest struct { |
60 Request string | 67 Request string |
61 WantReply bool | 68 WantReply bool |
62 Payload []byte | 69 Payload []byte |
63 } | 70 } |
64 | 71 |
65 func (c ChannelRequest) Error() string { | |
66 return "ssh: channel request received" | |
67 } | |
68 | |
69 // RejectionReason is an enumeration used when rejecting channel creation | 72 // RejectionReason is an enumeration used when rejecting channel creation |
70 // requests. See RFC 4254, section 5.1. | 73 // requests. See RFC 4254, section 5.1. |
71 type RejectionReason uint32 | 74 type RejectionReason uint32 |
72 | 75 |
73 const ( | 76 const ( |
74 Prohibited RejectionReason = iota + 1 | 77 Prohibited RejectionReason = iota + 1 |
75 ConnectionFailed | 78 ConnectionFailed |
76 UnknownChannelType | 79 UnknownChannelType |
77 ResourceShortage | 80 ResourceShortage |
78 ) | 81 ) |
79 | 82 |
80 // String converts the rejection reason to human readable form. | 83 // String converts the rejection reason to human readable form. |
81 func (r RejectionReason) String() string { | 84 func (r RejectionReason) String() string { |
82 switch r { | 85 switch r { |
83 case Prohibited: | 86 case Prohibited: |
84 return "administratively prohibited" | 87 return "administratively prohibited" |
85 case ConnectionFailed: | 88 case ConnectionFailed: |
86 return "connect failed" | 89 return "connect failed" |
87 case UnknownChannelType: | 90 case UnknownChannelType: |
88 return "unknown channel type" | 91 return "unknown channel type" |
89 case ResourceShortage: | 92 case ResourceShortage: |
90 return "resource shortage" | 93 return "resource shortage" |
91 } | 94 } |
92 return fmt.Sprintf("unknown reason %d", int(r)) | 95 return fmt.Sprintf("unknown reason %d", int(r)) |
93 } | 96 } |
94 | 97 |
98 // channel implements the Channel | |
95 type channel struct { | 99 type channel struct { |
96 » packetConn // the underlying transport | 100 » // R/O after creation |
101 » chanType string | |
102 » extraData []byte | |
97 localId, remoteId uint32 | 103 localId, remoteId uint32 |
98 remoteWin window | |
99 maxPacket uint32 | 104 maxPacket uint32 |
100 » isClosed uint32 // atomic bool, non zero if true | 105 » mux *mux |
106 | |
107 » // If set, we have called Accept or Reject on this channel | |
108 » decided bool | |
109 | |
110 » // Pending internal channel messages. | |
111 » msg chan interface{} | |
112 | |
113 » // Pending user-serviceable messages. | |
114 » sentRequestMu sync.Mutex | |
115 » pendingRequests chan *ChannelRequest | |
116 | |
117 » sentEOF bool | |
118 | |
119 » // thread-safe data | |
120 » remoteWin window | |
121 » pending *buffer | |
122 » extPending *buffer | |
123 | |
124 » // Protects all of the below. | |
125 » mu sync.Mutex | |
126 » myWindow uint32 | |
127 » sentClose bool | |
101 } | 128 } |
102 | 129 |
103 func (c *channel) sendWindowAdj(n int) error { | 130 func (c *channel) getWindowSpace(max uint32) (uint32, error) { |
104 » msg := windowAdjustMsg{ | 131 » // check if closed? |
105 » » PeersId: c.remoteId, | |
106 » » AdditionalBytes: uint32(n), | |
107 » } | |
108 » return c.writePacket(marshal(msgChannelWindowAdjust, msg)) | |
109 } | |
110 | |
111 // sendEOF sends EOF to the remote side. RFC 4254 Section 5.3 | |
112 func (c *channel) sendEOF() error { | |
113 » return c.writePacket(marshal(msgChannelEOF, channelEOFMsg{ | |
114 » » PeersId: c.remoteId, | |
115 » })) | |
116 } | |
117 | |
118 // sendClose informs the remote side of our intent to close the channel. | |
119 func (c *channel) sendClose() error { | |
120 » return c.packetConn.writePacket(marshal(msgChannelClose, channelCloseMsg { | |
121 » » PeersId: c.remoteId, | |
122 » })) | |
123 } | |
124 | |
125 func (c *channel) sendChannelOpenFailure(reason RejectionReason, message string) error { | |
126 » reject := channelOpenFailureMsg{ | |
127 » » PeersId: c.remoteId, | |
128 » » Reason: reason, | |
129 » » Message: message, | |
130 » » Language: "en", | |
131 » } | |
132 » return c.writePacket(marshal(msgChannelOpenFailure, reject)) | |
133 } | |
134 | |
135 func (c *channel) writePacket(b []byte) error { | |
136 » if c.closed() { | |
137 » » return io.EOF | |
138 » } | |
139 » if uint32(len(b)) > c.maxPacket { | |
140 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(b), c.maxPacket) | |
141 » } | |
142 » return c.packetConn.writePacket(b) | |
143 } | |
144 | |
145 func (c *channel) closed() bool { | |
146 » return atomic.LoadUint32(&c.isClosed) > 0 | |
147 } | |
148 | |
149 func (c *channel) setClosed() bool { | |
150 » return atomic.CompareAndSwapUint32(&c.isClosed, 0, 1) | |
151 } | |
152 | |
153 type serverChan struct { | |
154 » channel | |
155 » // immutable once created | |
156 » chanType string | |
157 » extraData []byte | |
158 | |
159 » serverConn *ServerConn | |
160 » myWindow uint32 | |
161 » theyClosed bool // indicates the close msg has been received from the r emote side | |
162 » theySentEOF bool | |
163 » isDead uint32 | |
164 » err error | |
165 | |
166 » pendingRequests []ChannelRequest | |
167 » pendingData []byte | |
168 » head, length int | |
169 | |
170 » // This lock is inferior to serverConn.lock | |
171 » cond *sync.Cond | |
172 } | |
173 | |
174 func (c *serverChan) Accept() error { | |
175 » c.serverConn.lock.Lock() | |
176 » defer c.serverConn.lock.Unlock() | |
177 | |
178 » if c.serverConn.err != nil { | |
179 » » return c.serverConn.err | |
180 » } | |
181 | |
182 » confirm := channelOpenConfirmMsg{ | |
183 » » PeersId: c.remoteId, | |
184 » » MyId: c.localId, | |
185 » » MyWindow: c.myWindow, | |
186 » » MaxPacketSize: c.maxPacket, | |
187 » } | |
188 » return c.writePacket(marshal(msgChannelOpenConfirm, confirm)) | |
189 } | |
190 | |
191 func (c *serverChan) Reject(reason RejectionReason, message string) error { | |
192 » c.serverConn.lock.Lock() | |
193 » defer c.serverConn.lock.Unlock() | |
194 | |
195 » if c.serverConn.err != nil { | |
196 » » return c.serverConn.err | |
197 » } | |
198 | |
199 » return c.sendChannelOpenFailure(reason, message) | |
200 } | |
201 | |
202 func (c *serverChan) handlePacket(packet interface{}) { | |
203 » c.cond.L.Lock() | |
204 » defer c.cond.L.Unlock() | |
205 | |
206 » switch packet := packet.(type) { | |
207 » case *channelRequestMsg: | |
208 » » req := ChannelRequest{ | |
209 » » » Request: packet.Request, | |
210 » » » WantReply: packet.WantReply, | |
211 » » » Payload: packet.RequestSpecificData, | |
212 » » } | |
213 | |
214 » » c.pendingRequests = append(c.pendingRequests, req) | |
215 » » c.cond.Signal() | |
216 » case *channelCloseMsg: | |
217 » » c.theyClosed = true | |
218 » » c.cond.Signal() | |
219 » case *channelEOFMsg: | |
220 » » c.theySentEOF = true | |
221 » » c.cond.Signal() | |
222 » case *windowAdjustMsg: | |
223 » » if !c.remoteWin.add(packet.AdditionalBytes) { | |
224 » » » panic("illegal window update") | |
225 » » } | |
226 » default: | |
227 » » panic("unknown packet type") | |
228 » } | |
229 } | |
230 | |
231 func (c *serverChan) handleData(data []byte) { | |
232 » c.cond.L.Lock() | |
233 » defer c.cond.L.Unlock() | |
234 | |
235 » // The other side should never send us more than our window. | |
236 » if len(data)+c.length > len(c.pendingData) { | |
237 » » // TODO(agl): we should tear down the channel with a protocol | |
238 » » // error. | |
239 » » return | |
240 » } | |
241 | |
242 » c.myWindow -= uint32(len(data)) | |
243 » for i := 0; i < 2; i++ { | |
244 » » tail := c.head + c.length | |
245 » » if tail >= len(c.pendingData) { | |
246 » » » tail -= len(c.pendingData) | |
247 » » } | |
248 » » n := copy(c.pendingData[tail:], data) | |
249 » » data = data[n:] | |
250 » » c.length += n | |
251 » } | |
252 | |
253 » c.cond.Signal() | |
254 } | |
255 | |
256 func (c *serverChan) Stderr() io.Writer { | |
257 » return extendedDataChannel{c: c, t: extendedDataStderr} | |
258 } | |
259 | |
260 // extendedDataChannel is an io.Writer that writes any data to c as extended | |
261 // data of the given type. | |
262 type extendedDataChannel struct { | |
263 » t extendedDataTypeCode | |
264 » c *serverChan | |
265 } | |
266 | |
267 func (edc extendedDataChannel) Write(data []byte) (n int, err error) { | |
268 » const headerLength = 13 // 1 byte message type, 4 bytes remoteId, 4 byte s extended message type, 4 bytes data length | |
269 » c := edc.c | |
270 » for len(data) > 0 { | |
271 » » space := min(c.maxPacket-headerLength, len(data)) | |
272 » » if space, err = c.getWindowSpace(space); err != nil { | |
273 » » » return 0, err | |
274 » » } | |
275 » » todo := data | |
276 » » if uint32(len(todo)) > space { | |
277 » » » todo = todo[:space] | |
278 » » } | |
279 | |
280 » » packet := make([]byte, headerLength+len(todo)) | |
281 » » packet[0] = msgChannelExtendedData | |
282 » » marshalUint32(packet[1:], c.remoteId) | |
283 » » marshalUint32(packet[5:], uint32(edc.t)) | |
284 » » marshalUint32(packet[9:], uint32(len(todo))) | |
285 » » copy(packet[13:], todo) | |
286 | |
287 » » if err = c.writePacket(packet); err != nil { | |
288 » » » return | |
289 » » } | |
290 | |
291 » » n += len(todo) | |
292 » » data = data[len(todo):] | |
293 » } | |
294 | |
295 » return | |
296 } | |
297 | |
298 func (c *serverChan) Read(data []byte) (n int, err error) { | |
299 » n, err, windowAdjustment := c.read(data) | |
300 | |
301 » if windowAdjustment > 0 { | |
302 » » packet := marshal(msgChannelWindowAdjust, windowAdjustMsg{ | |
303 » » » PeersId: c.remoteId, | |
304 » » » AdditionalBytes: windowAdjustment, | |
305 » » }) | |
306 » » err = c.writePacket(packet) | |
307 » } | |
308 | |
309 » return | |
310 } | |
311 | |
312 func (c *serverChan) read(data []byte) (n int, err error, windowAdjustment uint3 2) { | |
313 » c.cond.L.Lock() | |
314 » defer c.cond.L.Unlock() | |
315 | |
316 » if c.err != nil { | |
317 » » return 0, c.err, 0 | |
318 » } | |
319 | |
320 » for { | |
321 » » if c.theySentEOF || c.theyClosed || c.dead() { | |
322 » » » return 0, io.EOF, 0 | |
323 » » } | |
324 | |
325 » » if len(c.pendingRequests) > 0 { | |
326 » » » req := c.pendingRequests[0] | |
327 » » » if len(c.pendingRequests) == 1 { | |
328 » » » » c.pendingRequests = nil | |
329 » » » } else { | |
330 » » » » oldPendingRequests := c.pendingRequests | |
331 » » » » c.pendingRequests = make([]ChannelRequest, len(o ldPendingRequests)-1) | |
332 » » » » copy(c.pendingRequests, oldPendingRequests[1:]) | |
333 » » » } | |
334 | |
335 » » » return 0, req, 0 | |
336 » » } | |
337 | |
338 » » if c.length > 0 { | |
339 » » » tail := min(uint32(c.head+c.length), len(c.pendingData)) | |
340 » » » n = copy(data, c.pendingData[c.head:tail]) | |
341 » » » c.head += n | |
342 » » » c.length -= n | |
343 » » » if c.head == len(c.pendingData) { | |
344 » » » » c.head = 0 | |
345 » » » } | |
346 | |
347 » » » windowAdjustment = uint32(len(c.pendingData)-c.length) - c.myWindow | |
348 » » » if windowAdjustment < uint32(len(c.pendingData)/2) { | |
349 » » » » windowAdjustment = 0 | |
350 » » » } | |
351 » » » c.myWindow += windowAdjustment | |
352 | |
353 » » » return | |
354 » » } | |
355 | |
356 » » c.cond.Wait() | |
357 » } | |
358 | |
359 » panic("unreachable") | |
360 } | |
361 | |
362 // getWindowSpace takes, at most, max bytes of space from the peer's window. It | |
363 // returns the number of bytes actually reserved. | |
364 func (c *serverChan) getWindowSpace(max uint32) (uint32, error) { | |
365 » if c.dead() || c.closed() { | |
366 » » return 0, io.EOF | |
367 » } | |
368 return c.remoteWin.reserve(max), nil | 132 return c.remoteWin.reserve(max), nil |
369 } | 133 } |
370 | 134 |
371 func (c *serverChan) dead() bool { | 135 // writePacket sends the packet over the wire. If the packet is a |
372 » return atomic.LoadUint32(&c.isDead) > 0 | 136 // channel close, it updates sentClose. This method takes the lock |
137 // c.mu. | |
138 func (c *channel) writePacket(packet []byte) error { | |
139 » if uint32(len(packet)) > c.maxPacket { | |
140 » » return fmt.Errorf("ssh: cannot write %d bytes, maxPacket is %d b ytes", len(packet), c.maxPacket) | |
141 » } | |
142 | |
143 » c.mu.Lock() | |
144 » if c.sentClose { | |
145 » » c.mu.Unlock() | |
146 » » return io.EOF | |
147 » } | |
148 » c.sentClose = (packet[0] == msgChannelClose) | |
149 » err := c.mux.conn.writePacket(packet) | |
150 » c.mu.Unlock() | |
151 » return err | |
373 } | 152 } |
374 | 153 |
375 func (c *serverChan) setDead() { | 154 func (c *channel) sendMessage(code byte, msg interface{}) error { |
376 » atomic.StoreUint32(&c.isDead, 1) | 155 » if debug { |
377 } | 156 » » log.Printf("send %d: %#v", c.mux.chanList.offset, msg) |
378 | |
379 func (c *serverChan) Write(data []byte) (n int, err error) { | |
380 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length | |
381 » for len(data) > 0 { | |
382 » » space := min(c.maxPacket-headerLength, len(data)) | |
383 » » if space, err = c.getWindowSpace(space); err != nil { | |
384 » » » return 0, err | |
385 » » } | |
386 » » todo := data | |
387 » » if uint32(len(todo)) > space { | |
388 » » » todo = todo[:space] | |
389 » » } | |
390 | |
391 » » packet := make([]byte, headerLength+len(todo)) | |
392 » » packet[0] = msgChannelData | |
393 » » marshalUint32(packet[1:], c.remoteId) | |
394 » » marshalUint32(packet[5:], uint32(len(todo))) | |
395 » » copy(packet[9:], todo) | |
396 | |
397 » » if err = c.writePacket(packet); err != nil { | |
398 » » » return | |
399 » » } | |
400 | |
401 » » n += len(todo) | |
402 » » data = data[len(todo):] | |
403 } | 157 } |
404 | 158 |
405 » return | 159 » p := marshal(code, msg) |
406 } | 160 » binary.BigEndian.PutUint32(p[1:], c.remoteId) |
407 | 161 » return c.writePacket(p) |
408 // Close signals the intent to close the channel. | |
409 func (c *serverChan) Close() error { | |
410 » c.serverConn.lock.Lock() | |
411 » defer c.serverConn.lock.Unlock() | |
412 | |
413 » if c.serverConn.err != nil { | |
414 » » return c.serverConn.err | |
415 » } | |
416 | |
417 » if !c.setClosed() { | |
418 » » return errors.New("ssh: channel already closed") | |
419 » } | |
420 » return c.sendClose() | |
421 } | |
422 | |
423 func (c *serverChan) AckRequest(ok bool) error { | |
424 » c.serverConn.lock.Lock() | |
425 » defer c.serverConn.lock.Unlock() | |
426 | |
427 » if c.serverConn.err != nil { | |
428 » » return c.serverConn.err | |
429 » } | |
430 | |
431 » if !ok { | |
432 » » ack := channelRequestFailureMsg{ | |
433 » » » PeersId: c.remoteId, | |
434 » » } | |
435 » » return c.writePacket(marshal(msgChannelFailure, ack)) | |
436 » } | |
437 | |
438 » ack := channelRequestSuccessMsg{ | |
439 » » PeersId: c.remoteId, | |
440 » } | |
441 » return c.writePacket(marshal(msgChannelSuccess, ack)) | |
442 } | |
443 | |
444 func (c *serverChan) ChannelType() string { | |
445 » return c.chanType | |
446 } | |
447 | |
448 func (c *serverChan) ExtraData() []byte { | |
449 » return c.extraData | |
450 } | |
451 | |
452 // A clientChan represents a single RFC 4254 channel multiplexed | |
453 // over a SSH connection. | |
454 type clientChan struct { | |
455 » channel | |
456 » stdin *chanWriter | |
457 » stdout *chanReader | |
458 » stderr *chanReader | |
459 » msg chan interface{} | |
460 } | |
461 | |
462 // newClientChan returns a partially constructed *clientChan | |
463 // using the local id provided. To be usable clientChan.remoteId | |
464 // needs to be assigned once known. | |
465 func newClientChan(cc packetConn, id uint32) *clientChan { | |
466 » c := &clientChan{ | |
467 » » channel: channel{ | |
468 » » » packetConn: cc, | |
469 » » » localId: id, | |
470 » » » remoteWin: window{Cond: newCond()}, | |
471 » » }, | |
472 » » msg: make(chan interface{}, 16), | |
473 » } | |
474 » c.stdin = &chanWriter{ | |
475 » » channel: &c.channel, | |
476 » } | |
477 » c.stdout = &chanReader{ | |
478 » » channel: &c.channel, | |
479 » » buffer: newBuffer(), | |
480 » } | |
481 » c.stderr = &chanReader{ | |
482 » » channel: &c.channel, | |
483 » » buffer: newBuffer(), | |
484 » } | |
485 » return c | |
486 } | |
487 | |
488 // waitForChannelOpenResponse, if successful, fills out | |
489 // the remoteId and records any initial window advertisement. | |
490 func (c *clientChan) waitForChannelOpenResponse() error { | |
491 » switch msg := (<-c.msg).(type) { | |
492 » case *channelOpenConfirmMsg: | |
493 » » if msg.MaxPacketSize < minPacketLength || msg.MaxPacketSize > 1< <31 { | |
494 » » » return errors.New("ssh: invalid MaxPacketSize from peer" ) | |
495 » » } | |
496 » » // fixup remoteId field | |
497 » » c.remoteId = msg.MyId | |
498 » » c.maxPacket = msg.MaxPacketSize | |
499 » » c.remoteWin.add(msg.MyWindow) | |
500 » » return nil | |
501 » case *channelOpenFailureMsg: | |
502 » » return errors.New(safeString(msg.Message)) | |
503 » } | |
504 » return errors.New("ssh: unexpected packet") | |
505 } | |
506 | |
507 // Close signals the intent to close the channel. | |
508 func (c *clientChan) Close() error { | |
509 » if !c.setClosed() { | |
510 » » return errors.New("ssh: channel already closed") | |
511 » } | |
512 » c.stdout.eof() | |
513 » c.stderr.eof() | |
514 » return c.sendClose() | |
515 } | |
516 | |
517 // A chanWriter represents the stdin of a remote process. | |
518 type chanWriter struct { | |
519 » *channel | |
520 » // indicates the writer has been closed. eof is owned by the | |
521 » // caller of Write/Close. | |
522 » eof bool | |
523 } | |
524 | |
525 // Write writes data to the remote process's standard input. | |
526 func (w *chanWriter) Write(data []byte) (written int, err error) { | |
527 » const headerLength = 9 // 1 byte message type, 4 bytes remoteId, 4 bytes data length | |
528 » for len(data) > 0 { | |
529 » » if w.eof || w.closed() { | |
530 » » » err = io.EOF | |
531 » » » return | |
532 » » } | |
533 » » // never send more data than maxPacket even if | |
534 » » // there is sufficient window. | |
535 » » n := min(w.maxPacket-headerLength, len(data)) | |
536 » » r := w.remoteWin.reserve(n) | |
537 » » n = r | |
538 » » remoteId := w.remoteId | |
539 » » packet := []byte{ | |
540 » » » msgChannelData, | |
541 » » » byte(remoteId >> 24), byte(remoteId >> 16), byte(remoteI d >> 8), byte(remoteId), | |
542 » » » byte(n >> 24), byte(n >> 16), byte(n >> 8), byte(n), | |
543 » » } | |
544 » » if err = w.writePacket(append(packet, data[:n]...)); err != nil { | |
545 » » » break | |
546 » » } | |
547 » » data = data[n:] | |
548 » » written += int(n) | |
549 » } | |
550 » return | |
551 } | 162 } |
552 | 163 |
553 func min(a uint32, b int) uint32 { | 164 func min(a uint32, b int) uint32 { |
554 if a < uint32(b) { | 165 if a < uint32(b) { |
555 return a | 166 return a |
556 } | 167 } |
557 return uint32(b) | 168 return uint32(b) |
558 } | 169 } |
559 | 170 |
560 func (w *chanWriter) Close() error { | 171 func (c *channel) WriteExtended(data []byte, extendedCode uint32) (n int, err er ror) { |
561 » w.eof = true | 172 » if c.sentEOF { |
562 » return w.sendEOF() | 173 » » return 0, io.EOF |
563 } | 174 » } |
564 | 175 » // 1 byte message type, 4 bytes remoteId, 4 bytes data length |
565 // A chanReader represents stdout or stderr of a remote process. | 176 » opCode := byte(msgChannelData) |
566 type chanReader struct { | 177 » headerLength := uint32(9) |
567 » *channel // the channel backing this reader | 178 » if extendedCode > 0 { |
568 » *buffer | 179 » » headerLength += 4 |
569 } | 180 » » opCode = msgChannelExtendedData |
570 | 181 » } |
571 // Read reads data from the remote process's stdout or stderr. | 182 |
572 func (r *chanReader) Read(buf []byte) (int, error) { | 183 » for len(data) > 0 { |
573 » n, err := r.buffer.Read(buf) | 184 » » space := min(c.maxPacket-headerLength, len(data)) |
185 » » if space, err = c.getWindowSpace(space); err != nil { | |
186 » » » return n, err | |
187 » » } | |
188 » » todo := data | |
189 » » if uint32(len(todo)) > space { | |
190 » » » todo = todo[:space] | |
191 » » } | |
192 | |
193 » » packet := make([]byte, headerLength+uint32(len(todo))) | |
194 » » packet[0] = opCode | |
195 » » marshalUint32(packet[1:], c.remoteId) | |
196 » » if extendedCode > 0 { | |
197 » » » marshalUint32(packet[5:], uint32(extendedCode)) | |
198 » » } | |
199 » » marshalUint32(packet[headerLength-4:], uint32(len(todo))) | |
200 » » copy(packet[headerLength:], todo) | |
201 » » if err = c.writePacket(packet); err != nil { | |
202 » » » return n, err | |
203 » » } | |
204 | |
205 » » n += len(todo) | |
206 » » data = data[len(todo):] | |
207 » } | |
208 | |
209 » return n, err | |
210 } | |
211 | |
212 func (c *channel) handleData(packet []byte) error { | |
213 » sz := 9 | |
214 » if packet[0] == msgChannelExtendedData { | |
215 » » sz = 13 | |
216 » } | |
217 » if len(packet) < sz { | |
218 » » // malformed data packet | |
219 » » return ParseError{packet[0]} | |
220 » } | |
221 | |
222 » var extended uint32 | |
223 » if sz > 9 { | |
224 » » extended = binary.BigEndian.Uint32(packet[5:]) | |
225 » } | |
226 | |
227 » length := binary.BigEndian.Uint32(packet[sz-4 : sz]) | |
228 » if length == 0 { | |
229 » » return nil | |
230 » } | |
231 » data := packet[sz:] | |
232 » if length != uint32(len(data)) { | |
233 » » return errors.New("ssh: wrong packet length") | |
234 » } | |
235 | |
236 » c.mu.Lock() | |
237 » if c.myWindow < length { | |
238 » » c.mu.Unlock() | |
239 » » // TODO(hanwen): should send Disconnect with reason? | |
240 » » return errors.New("ssh: remote side wrote too much") | |
241 » } | |
242 » c.myWindow -= length | |
243 » c.mu.Unlock() | |
244 | |
245 » if extended == 1 { | |
246 » » c.extPending.write(data) | |
247 » } else if extended > 0 { | |
248 » » // discard other extended data. | |
249 » } else { | |
250 » » c.pending.write(data) | |
251 » } | |
252 » return nil | |
253 } | |
254 | |
255 func (c *channel) adjustWindow(n uint32) error { | |
256 » c.mu.Lock() | |
257 » c.myWindow += uint32(n) | |
258 » c.mu.Unlock() | |
259 » return c.sendMessage(msgChannelWindowAdjust, windowAdjustMsg{ | |
260 » » AdditionalBytes: uint32(n), | |
261 » }) | |
262 } | |
263 | |
264 func (c *channel) ReadExtended(data []byte, extended uint32) (n int, err error) { | |
265 » if extended == 1 { | |
266 » » n, err = c.extPending.Read(data) | |
267 » } else if extended == 0 { | |
268 » » n, err = c.pending.Read(data) | |
269 » } | |
270 | |
271 » if n > 0 { | |
272 » » err = c.adjustWindow(uint32(n)) | |
273 » » // sendWindowAdjust can return io.EOF if the remote | |
274 » » // peer has closed the connection, however we want to | |
275 » » // defer forwarding io.EOF to the caller of Read until | |
276 » » // the buffer has been drained. | |
277 » » if n > 0 && err == io.EOF { | |
278 » » » err = nil | |
279 » » } | |
280 » } | |
281 | |
282 » return n, err | |
283 } | |
284 | |
285 func (c *channel) handlePacket(packet []byte) error { | |
286 » if uint32(len(packet)) > c.maxPacket { | |
287 » » // TODO(hanwen): should send Disconnect? | |
288 » » return errors.New("ssh: incoming packet exceeds maximum size") | |
289 » } | |
290 | |
291 » switch packet[0] { | |
292 » case msgChannelData, msgChannelExtendedData: | |
293 » » return c.handleData(packet) | |
294 » case msgChannelClose: | |
295 » » // Ack the close. | |
296 » » c.sendMessage(msgChannelClose, channelCloseMsg{ | |
297 » » » PeersId: c.remoteId}) | |
298 | |
299 » » c.pending.eof() | |
300 » » c.extPending.eof() | |
301 » » close(c.msg) | |
302 » » close(c.pendingRequests) | |
303 » » c.mux.chanList.remove(c.localId) | |
304 | |
305 » » return nil | |
306 » case msgChannelEOF: | |
307 » » c.pending.eof() | |
308 » » // RFC 4254 is mute on how EOF affects dataExt messages but | |
309 » » // it is logical to signal EOF at the same time. | |
310 » » c.extPending.eof() | |
311 » » return nil | |
312 » } | |
313 | |
314 » decoded, err := decode(packet) | |
574 if err != nil { | 315 if err != nil { |
575 » » if err == io.EOF { | 316 » » return err |
576 » » » return n, err | 317 » } |
577 » » } | 318 |
578 » » return 0, err | 319 » switch msg := decoded.(type) { |
579 » } | 320 » case *windowAdjustMsg: |
580 » err = r.sendWindowAdj(n) | 321 » » if !c.remoteWin.add(msg.AdditionalBytes) { |
581 » if err == io.EOF && n > 0 { | 322 » » » return fmt.Errorf("invalid window update %d", msg.Additi onalBytes) |
582 » » // sendWindowAdjust can return io.EOF if the remote peer has | 323 » » } |
583 » » // closed the connection, however we want to defer forwarding io .EOF to the | 324 |
584 » » // caller of Read until the buffer has been drained. | 325 » case *channelRequestMsg: |
585 » » err = nil | 326 » » req := ChannelRequest{ |
586 » } | 327 » » » Request: msg.Request, |
587 » return n, err | 328 » » » WantReply: msg.WantReply, |
588 } | 329 » » » Payload: msg.RequestSpecificData, |
330 » » } | |
331 | |
332 » » c.pendingRequests <- &req | |
333 » default: | |
334 » » c.msg <- msg | |
335 » } | |
336 » return nil | |
337 } | |
338 | |
339 func newChannel(chanType string, extraData []byte) *channel { | |
340 » return &channel{ | |
341 » » remoteWin: window{Cond: newCond()}, | |
342 » » myWindow: defaultWindowSize, | |
343 » » pending: newBuffer(), | |
344 » » extPending: newBuffer(), | |
345 » » pendingRequests: make(chan *ChannelRequest, 16), | |
346 » » msg: make(chan interface{}, 16), | |
347 » » chanType: chanType, | |
348 » » extraData: extraData, | |
349 » } | |
350 } | |
351 | |
352 var errUndecided = errors.New("ssh: must Accept or Reject channel") | |
353 var errDecidedAlready = errors.New("ssh: can call Accept or Reject only once") | |
354 | |
355 type extChannel channel | |
356 | |
357 func (e *extChannel) Write(data []byte) (n int, err error) { | |
358 » return ((*channel)(e)).WriteExtended(data, 1) | |
359 } | |
360 func (e *extChannel) Read(data []byte) (n int, err error) { | |
361 » return ((*channel)(e)).ReadExtended(data, 1) | |
362 } | |
363 | |
364 func (c *channel) Accept() error { | |
365 » if c.decided { | |
366 » » return errDecidedAlready | |
367 » } | |
368 » confirm := channelOpenConfirmMsg{ | |
369 » » PeersId: c.remoteId, | |
370 » » MyId: c.localId, | |
371 » » MyWindow: c.myWindow, | |
372 » » MaxPacketSize: c.maxPacket, | |
373 » } | |
374 » c.decided = true | |
375 » return c.sendMessage(msgChannelOpenConfirm, confirm) | |
376 } | |
377 | |
378 func (ch *channel) Reject(reason RejectionReason, message string) error { | |
379 » if ch.decided { | |
380 » » return errDecidedAlready | |
381 » } | |
382 » reject := channelOpenFailureMsg{ | |
383 » » PeersId: ch.remoteId, | |
384 » » Reason: reason, | |
385 » » Message: message, | |
386 » » Language: "en", | |
387 » } | |
388 » ch.decided = true | |
389 » return ch.sendMessage(msgChannelOpenFailure, reject) | |
390 } | |
391 | |
392 func (ch *channel) Read(data []byte) (int, error) { | |
393 » if !ch.decided { | |
394 » » return 0, errUndecided | |
395 » } | |
396 | |
397 » return ch.ReadExtended(data, 0) | |
398 } | |
399 | |
400 func (ch *channel) IncomingRequests() chan *ChannelRequest { | |
401 » return ch.pendingRequests | |
402 } | |
403 | |
404 func (ch *channel) Write(data []byte) (int, error) { | |
405 » if !ch.decided { | |
406 » » return 0, errUndecided | |
407 » } | |
408 » return ch.WriteExtended(data, 0) | |
409 } | |
410 | |
411 func (ch *channel) CloseWrite() error { | |
412 » if !ch.decided { | |
413 » » return errUndecided | |
414 » } | |
415 » ch.sentEOF = true | |
416 » return ch.sendMessage(msgChannelEOF, channelEOFMsg{ | |
417 » » PeersId: ch.remoteId}) | |
418 } | |
419 | |
420 func (ch *channel) Close() error { | |
421 » if !ch.decided { | |
422 » » return errUndecided | |
423 » } | |
424 | |
425 » return ch.sendMessage(msgChannelClose, channelCloseMsg{ | |
426 » » PeersId: ch.remoteId}) | |
427 } | |
428 | |
429 func (ch *channel) Stderr() io.ReadWriter { | |
430 » if !ch.decided { | |
431 » » return nil | |
432 » } | |
433 » return (*extChannel)(ch) | |
434 } | |
435 | |
436 // SendRequest sends a channel request. If wantReply is set, it will | |
437 // wait for a reply and return the result as a boolean. | |
438 func (ch *channel) SendRequest(name string, wantReply bool, payload []byte) (boo l, error) { | |
439 » if !ch.decided { | |
440 » » return false, errUndecided | |
441 » } | |
442 | |
443 » if wantReply { | |
444 » » ch.sentRequestMu.Lock() | |
445 » » defer ch.sentRequestMu.Unlock() | |
446 » } | |
447 | |
448 » msg := channelRequestMsg{ | |
449 » » PeersId: ch.remoteId, | |
450 » » Request: name, | |
451 » » WantReply: wantReply, | |
452 » » RequestSpecificData: payload, | |
453 » } | |
454 | |
455 » if err := ch.sendMessage(msgChannelRequest, msg); err != nil { | |
456 » » return false, err | |
457 » } | |
458 | |
459 » if wantReply { | |
460 » » m, ok := (<-ch.msg) | |
461 » » if !ok { | |
462 » » » return false, io.EOF | |
463 » » } | |
464 » » switch m.(type) { | |
465 » » case *channelRequestFailureMsg: | |
466 » » » return false, nil | |
467 » » case *channelRequestSuccessMsg: | |
468 » » » return true, nil | |
469 » » default: | |
470 » » » return false, fmt.Errorf("unexpected response %#v", m) | |
471 » » } | |
472 » } | |
473 | |
474 » return false, nil | |
475 } | |
476 | |
477 // AckRequest either sends an ack or nack to the channel request. | |
478 func (ch *channel) AckRequest(ok bool) error { | |
479 » if !ch.decided { | |
480 » » return errUndecided | |
481 » } | |
482 | |
483 » var msg interface{} | |
484 » var code byte | |
485 » if !ok { | |
486 » » code = msgChannelFailure | |
487 » » msg = channelRequestFailureMsg{ | |
488 » » » PeersId: ch.remoteId, | |
489 » » } | |
490 » } else { | |
491 » » code = msgChannelSuccess | |
492 » » msg = channelRequestSuccessMsg{ | |
493 » » » PeersId: ch.remoteId, | |
494 » » } | |
495 » } | |
496 » return ch.sendMessage(code, msg) | |
497 } | |
498 | |
499 func (ch *channel) ChannelType() string { | |
500 » return ch.chanType | |
501 } | |
502 | |
503 func (ch *channel) ExtraData() []byte { | |
504 » return ch.extraData | |
505 } | |
OLD | NEW |