Package x2go :: Module gevent_subprocess
[frames] | no frames]

Source Code for Module x2go.gevent_subprocess

  1  # -*- coding: utf-8 -*- 
  2   
  3  # Copyright (C) 2010-2015 by Mike Gabriel <mike.gabriel@das-netzwerkteam.de> 
  4  # 
  5  # Python X2Go is free software; you can redistribute it and/or modify 
  6  # it under the terms of the GNU Affero General Public License as published by 
  7  # the Free Software Foundation; either version 3 of the License, or 
  8  # (at your option) any later version. 
  9  # 
 10  # Python X2Go is distributed in the hope that it will be useful, 
 11  # but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  # GNU Affero General Public License for more details. 
 14  # 
 15  # You should have received a copy of the GNU Affero General Public License 
 16  # along with this program; if not, write to the 
 17  # Free Software Foundation, Inc., 
 18  # 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. 
 19   
 20  # gevent_subprocess was found here: http://groups.google.com/group/gevent/browse_thread/thread/dba1a5d29e0a60ff 
 21  # Mark Visser <mjmvisser@gmail.com> 
 22   
 23  ### 
 24  ### Mark Visser, Sat, 20 Nov 2010 13:30:16 -0500 
 25  ### 
 26  ### <quote> 
 27  ### Hi Mike, 
 28  ### 
 29  ### I hereby place that code snippet in the public domain, feel free to apply any license that is appropriate! 
 30  ### 
 31  ### cheers, 
 32  ### -Mark 
 33  ### </quote> 
 34  ### 
 35  ### Thus, I place myself as the copyright holder for code in this file 
 36  ### for cases in that it is used in context of the X2Go project. 
 37  ### 
 38   
 39   
 40  """Implementation of the standard :mod:`subprocess` module that spawns greenlets""" 
 41  import errno 
 42  import sys 
 43  import fcntl, os 
 44   
 45  _subprocess = __import__('subprocess') 
 46   
 47  from gevent import socket, select, hub 
 48   
 49  # identical to original 
 50  CalledProcessError = _subprocess.CalledProcessError 
 51  MAXFD = _subprocess.MAXFD 
 52  PIPE = _subprocess.PIPE 
 53  STDOUT = _subprocess.STDOUT 
 54  call = _subprocess.call 
 55  check_call = _subprocess.check_call 
 56  list2cmdline = _subprocess.list2cmdline 
 57   
 58   
59 -class Popen(object):
60 - def __init__(self, *args, **kwargs):
61 # delegate to an actual Popen object 62 self.__p = _subprocess.Popen(*args, **kwargs) 63 # make the file handles nonblocking 64 if self.stdin is not None: 65 fcntl.fcntl(self.stdin, fcntl.F_SETFL, os.O_NONBLOCK) 66 if self.stdout is not None: 67 fcntl.fcntl(self.stdout, fcntl.F_SETFL, os.O_NONBLOCK) 68 if self.stderr is not None: 69 fcntl.fcntl(self.stderr, fcntl.F_SETFL, os.O_NONBLOCK)
70
71 - def __getattr__(self, name):
72 # delegate attribute lookup to the real Popen object 73 return getattr(self.__p, name)
74
75 - def _write_pipe(self, f, input):
76 # writes the given input to f without blocking 77 if input: 78 bytes_total = len(input) 79 bytes_written = 0 80 while bytes_written < bytes_total: 81 try: 82 # f.write() doesn't return anything, so use os.write. 83 bytes_written += os.write(f.fileno(), input[bytes_written:]) 84 except IOError, ex: 85 if ex[0] != errno.EAGAIN: 86 raise 87 sys.exc_clear() 88 socket.wait_write(f.fileno()) 89 f.close()
90
91 - def _read_pipe(self, f):
92 # reads output from f without blocking 93 # returns output 94 chunks = [] 95 while True: 96 try: 97 chunk = f.read(4096) 98 if not chunk: 99 break 100 chunks.append(chunk) 101 except IOError, ex: 102 if ex[0] != errno.EAGAIN: 103 raise 104 sys.exc_clear() 105 socket.wait_read(f.fileno()) 106 f.close() 107 return ''.join(chunks)
108
109 - def communicate(self, input=None):
110 # Optimization: If we are only using one pipe, or no pipe at 111 # all, using select() is unnecessary. 112 if [self.stdin, self.stdout, self.stderr].count(None) >= 2: 113 stdout = None 114 stderr = None 115 if self.stdin: 116 self._write_pipe(self.stdin, input) 117 elif self.stdout: 118 stdout = self._read_pipe(self.stdout) 119 elif self.stderr: 120 stderr = self._read_pipe(self.stderr) 121 self.wait() 122 return (stdout, stderr) 123 else: 124 return self._communicate(input)
125
126 - def _communicate(self, input):
127 # identical to original... all the heavy lifting is done 128 # in gevent.select.select 129 read_set = [] 130 write_set = [] 131 stdout = None # Return 132 stderr = None # Return 133 134 if self.stdin: 135 # Flush stdin buffer. 136 self.stdin.flush() 137 if input: 138 write_set.append(self.stdin) 139 else: 140 self.stdin.close() 141 if self.stdout: 142 read_set.append(self.stdout) 143 stdout = [] 144 if self.stderr: 145 read_set.append(self.stderr) 146 stderr = [] 147 148 input_offset = 0 149 while read_set or write_set: 150 try: 151 rlist, wlist, xlist = select.select(read_set, write_set, []) 152 except select.error, e: 153 if e.args[0] == errno.EINTR: 154 continue 155 raise 156 157 if self.stdin in wlist: 158 # When select has indicated that the file is writable, 159 # we can write up to PIPE_BUF bytes without risk 160 # blocking. POSIX defines PIPE_BUF >= 512 161 bytes_written = os.write(self.stdin.fileno(), buffer(input, input_offset, 512)) 162 input_offset += bytes_written 163 if input_offset >= len(input): 164 self.stdin.close() 165 write_set.remove(self.stdin) 166 167 if self.stdout in rlist: 168 data = os.read(self.stdout.fileno(), 1024) 169 if data == "": 170 self.stdout.close() 171 read_set.remove(self.stdout) 172 stdout.append(data) 173 174 if self.stderr in rlist: 175 data = os.read(self.stderr.fileno(), 1024) 176 if data == "": 177 self.stderr.close() 178 read_set.remove(self.stderr) 179 stderr.append(data) 180 181 # All data exchanged. Translate lists into strings. 182 if stdout is not None: 183 stdout = ''.join(stdout) 184 if stderr is not None: 185 stderr = ''.join(stderr) 186 187 # Translate newlines, if requested. We cannot let the file 188 # object do the translation: It is based on stdio, which is 189 # impossible to combine with select (unless forcing no 190 # buffering). 191 if self.universal_newlines and hasattr(file, 'newlines'): 192 if stdout: 193 stdout = self._translate_newlines(stdout) 194 if stderr: 195 stderr = self._translate_newlines(stderr) 196 197 self.wait() 198 return (stdout, stderr)
199
200 - def wait(self, check_interval=0.01):
201 # non-blocking, use hub.sleep 202 try: 203 while True: 204 status = self.poll() 205 if status >= 0: 206 return status 207 hub.sleep(check_interval) 208 except OSError, e: 209 if e.errno == errno.ECHILD: 210 # no child process, this happens if the child process 211 # already died and has been cleaned up 212 return -1 213 else: 214 raise
215