Files
sublime-text-3/Packages/SublimeREPL/repls/subprocess_repl.py

232 lines
7.4 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2011, Wojciech Bederski (wuub.net)
# All rights reserved.
# See LICENSE.txt for details.
from __future__ import absolute_import, unicode_literals, print_function, division
import subprocess
import os
import sys
from .repl import Repl
import signal
from sublime import load_settings
from .autocomplete_server import AutocompleteServer
from .killableprocess import Popen
PY3 = sys.version_info[0] == 3
if os.name == 'posix':
POSIX = True
import fcntl
import select
else:
POSIX = False
class Unsupported(Exception):
def __init__(self, msgs):
super(Unsupported, self).__init__()
self.msgs = msgs
def __repr__(self):
return "\n".join(self.msgs)
def win_find_executable(executable, env):
"""Explicetely looks for executable in env["PATH"]"""
if os.path.dirname(executable):
return executable # executable is already absolute filepath
path = env.get("PATH", "")
pathext = env.get("PATHEXT") or ".EXE"
dirs = path.split(os.path.pathsep)
(base, ext) = os.path.splitext(executable)
if ext:
extensions = [ext]
else:
extensions = pathext.split(os.path.pathsep)
for directory in dirs:
for extension in extensions:
filepath = os.path.join(directory, base + extension)
if os.path.exists(filepath):
return filepath
return None
class SubprocessRepl(Repl):
TYPE = "subprocess"
def __init__(self, encoding, cmd=None, env=None, cwd=None, extend_env=None, soft_quit="", autocomplete_server=False, **kwds):
super(SubprocessRepl, self).__init__(encoding, **kwds)
settings = load_settings('SublimeREPL.sublime-settings')
if cmd[0] == "[unsupported]":
raise Unsupported(cmd[1:])
self._autocomplete_server = None
if autocomplete_server:
self._autocomplete_server = AutocompleteServer(self, settings.get("autocomplete_server_ip"))
self._autocomplete_server.start()
env = self.env(env, extend_env, settings)
env[b"SUBLIMEREPL_AC_PORT"] = str(self.autocomplete_server_port()).encode("utf-8")
env[b"SUBLIMEREPL_AC_IP"] = settings.get("autocomplete_server_ip").encode("utf-8")
if PY3:
strings_env = {}
for k, v in env.items():
strings_env[k.decode("utf-8")] = v.decode("utf-8")
env = strings_env
self._cmd = self.cmd(cmd, env)
self._soft_quit = soft_quit
self._killed = False
self.popen = Popen(
self._cmd,
startupinfo=self.startupinfo(settings),
creationflags=self.creationflags(settings),
bufsize=1,
cwd=self.cwd(cwd, settings),
env=env,
stderr=subprocess.STDOUT,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
if POSIX:
flags = fcntl.fcntl(self.popen.stdout, fcntl.F_GETFL)
fcntl.fcntl(self.popen.stdout, fcntl.F_SETFL, flags | os.O_NONBLOCK)
def autocomplete_server_port(self):
if not self._autocomplete_server:
return None
return self._autocomplete_server.port()
def autocomplete_available(self):
if not self._autocomplete_server:
return False
return self._autocomplete_server.connected()
def autocomplete_completions(self, whole_line, pos_in_line, prefix, whole_prefix, locations):
return self._autocomplete_server.complete(
whole_line=whole_line,
pos_in_line=pos_in_line,
prefix=prefix,
whole_prefix=whole_prefix,
locations=locations,
)
def cmd(self, cmd, env):
"""On Linux and OSX just returns cmd, on windows it has to find
executable in env because of this: http://bugs.python.org/issue8557"""
if os.name != "nt":
return cmd
if isinstance(cmd, str):
_cmd = [cmd]
else:
_cmd = cmd
executable = win_find_executable(_cmd[0], env)
if executable:
_cmd[0] = executable
return _cmd
def cwd(self, cwd, settings):
if cwd and os.path.exists(cwd):
return cwd
return None
def env(self, env, extend_env, settings):
updated_env = env if env else os.environ.copy()
default_extend_env = settings.get("default_extend_env")
if default_extend_env:
updated_env.update(self.interpolate_extend_env(updated_env, default_extend_env))
if extend_env:
updated_env.update(self.interpolate_extend_env(updated_env, extend_env))
bytes_env = {}
for k, v in list(updated_env.items()):
try:
enc_k = self.encoder(str(k))[0]
enc_v = self.encoder(str(v))[0]
except UnicodeDecodeError:
continue #f*** it, we'll do it live
else:
bytes_env[enc_k] = enc_v
return bytes_env
def interpolate_extend_env(self, env, extend_env):
"""Interpolates (subst) values in extend_env.
Mostly for path manipulation"""
new_env = {}
for key, val in list(extend_env.items()):
new_env[key] = str(val).format(**env)
return new_env
def startupinfo(self, settings):
startupinfo = None
if os.name == 'nt':
from .killableprocess import STARTUPINFO, STARTF_USESHOWWINDOW
startupinfo = STARTUPINFO()
startupinfo.dwFlags |= STARTF_USESHOWWINDOW
startupinfo.wShowWindow |= 1 # SW_SHOWNORMAL
return startupinfo
def creationflags(self, settings):
creationflags = 0
if os.name == "nt":
creationflags = 0x8000000 # CREATE_NO_WINDOW
return creationflags
def name(self):
if self.external_id:
return self.external_id
if isinstance(self._cmd, str):
return self._cmd
return " ".join([str(x) for x in self._cmd])
def is_alive(self):
return self.popen.poll() is None
def read_bytes(self):
out = self.popen.stdout
if POSIX:
while True:
i, _, _ = select.select([out], [], [])
if i:
return out.read(4096)
else:
# this is windows specific problem, that you cannot tell if there
# are more bytes ready, so we read only 1 at a times
while True:
byte = self.popen.stdout.read(1)
if byte == b'\r':
# f'in HACK, for \r\n -> \n translation on windows
# I tried universal_endlines but it was pain and misery! :'(
continue
return byte
def write_bytes(self, bytes):
si = self.popen.stdin
si.write(bytes)
si.flush()
def kill(self):
self._killed = True
self.write(self._soft_quit)
self.popen.kill()
def available_signals(self):
signals = {}
for k, v in list(signal.__dict__.items()):
if not k.startswith("SIG"):
continue
signals[k] = v
return signals
def send_signal(self, sig):
if sig == signal.SIGTERM:
self._killed = True
if self.is_alive():
self.popen.send_signal(sig)