# -*- coding: utf-8 -*-

import logging
import shlex
import sys

from tornado import gen
from tornado.escape import to_unicode
from tornado.iostream import StreamClosedError
from tornado.process import Subprocess

from ...config import ConfigDict
from ...job import Job
from ...utils import str_value
from .config import AppConfig
from .logging import JaffleAppLogHandler

[docs]class BaseJaffleApp(object): """ Base class for Jaffle apps. """ completer_class = None lexer_class = None app_conf = None jobs = None def __init__(self, app_conf_data): """ Initializes BaseJaffleApp. Parameters ---------- app_conf_data : dict App configuration data. """ self.app_conf = AppConfig.from_dict(app_conf_data) self.ipython = get_ipython() # noqa logging.getLogger().handlers = [] self.log = logging.getLogger(self.app_name) level = str_value(self.conf.get('logger', {}).get('level', 'info')) self.log.setLevel(getattr(logging, level.upper())) self.log.handlers = [JaffleAppLogHandler(self.app_name, self.jaffle_port)] self.log.propagate = False = {} for job_name, job_data in self.jobs_conf.items(): logger = logging.getLogger(job_name) logger.parent = self.log logger.setLevel(self.log.level)[job_name] = Job(logger, job_name, job_data.get('command')) @property def app_name(self): """ Returns the app name. Returns ------- app_name : str App name. """ return self.app_conf.app_name @property def conf(self): """ Returns the app-specific config. Returns ------- conf : ConfigDict App-specific config. """ return self.app_conf.conf @property def options(self): """ Retruns options for the app. Returns ------- options : dict Options for the app. """ return self.app_conf.conf.get('options', ConfigDict()) @property def raw_namespace(self): """ Returns the raw namespace for string interpolation which does not include functions and variables. Returns ------- raw_namespace : dict Raw namespace for string interpolation. """ return self.app_conf.raw_namespace @property def runtime_variables(self): """ Returns runtime variables. Returns ------- runtime_variables : dict Runtime variables. """ return self.app_conf.runtime_variables @property def jaffle_port(self): """ Returns the Jaffle port. Returns ------- jaffle_port : int Jaffle port. """ return self.app_conf.jaffle_port @property def jobs_conf(self): """ Returns jobs config. Returns ------- jobs_conf : ConfigDict Jobs config. """ return self.app_conf.jobs_conf
[docs] def execute_code(self, code, *args, **kwargs): """ Executes a code. Parameters ---------- code : str Code to be executed. It will be formateed as ``code.format(*args, **kwargs)``. args : list Positional arguments to ``code.format()``. kwargs : dict Keyward arguments to ``code.formmat()``. Returns ------- future : tornado.gen.Future Future which will have the execution result. """ future = gen.Future() result = self.ipython.run_cell(code.format(*args, **kwargs)) future.set_result(result.result) return future
[docs] @gen.coroutine def execute_command(self, command, logger=None): """ Executes a command. Parameters ---------- command : str Command to be executed. logger : logging.Logger Logger. Returns ------- future : tornado.gen.Future Future which will have the execution result. """ log = logger or self.log log.debug('Executing command: %s', command) proc = Subprocess(shlex.split(command), stdout=Subprocess.STREAM, stderr=Subprocess.STREAM) try: while True: line_bytes = yield proc.stdout.read_until(b'\n') line = to_unicode(line_bytes).strip('\r\n') except StreamClosedError: pass
[docs] @gen.coroutine def execute_job(self, job_name): """ Executes a job. Parameters ---------- job_name : str Job to be executed. Returns ------- future : tornado.gen.Future Future which will have the execution result. """ job =[job_name] result = yield self.execute_command(job.command, logger=job.log) return result
def clear_module_cache(self, modules): """ Clears the module cache. This method deletes cache of imported modules from ``sys.modules`` whose name starts with specified modules. For example, ``jaffle`` is specified, ```` will also be deleted. Parameters ---------- modules : str[list] List of module (dot separated module names). """ def match(mod): return any([mod == m or mod.startswith('{}.'.format(m)) for m in modules]) self.log.debug('clearing module cache: %s', modules) for mod in [mod for mod in sys.modules if match(mod)]: self.log.debug(' clear: %s', mod) del sys.modules[mod]
[docs] @classmethod def command_to_code(self, app_name, command): """ Converts a command comes from ``jaffle attach <app>`` to a code to be executed. If the app supports ``jaffle attach``, this method must be implemented. Parameters ---------- app_name : str App name defined in jaffle.hcl. command : str Command name received from the shell of ``jaffle attach``. Returns ------- code : str Code to be executed. """ raise NotImplementedError('Must be implemented to support attaching')