Source code for ipypublish.convert.main

#!/usr/bin/env python
# import base64
from contextlib import contextmanager
from typing import List, Tuple, Union, Dict  # noqa: F401
import logging
import os
import time
import sys
import inspect

import traitlets as T
from traitlets import default, validate, TraitError

# from traitlets import validate
from traitlets.config.configurable import Configurable
from traitlets.config import Config
from jsonextended import edict
from six import string_types
import jsonschema

import ipypublish
from ipypublish.utils import (
    pathlib,
    handle_error,
    read_file_from_directory,
    get_module_path,
    get_valid_filename,
    find_entry_point,
)
from ipypublish import schema
from ipypublish.convert.nbmerge import merge_notebooks
from ipypublish.convert.config_manager import (
    get_export_config_path,
    load_export_config,
    load_template,
    create_exporter_cls,
)


[docs]def dict_to_config(config, unflatten=True, key_as_tuple=False): if unflatten: config = edict.unflatten(config, key_as_tuple=key_as_tuple, delim=".") return Config(config)
[docs]class IpyPubMain(Configurable): conversion = T.Unicode( "latex_ipypublish_main", help="key or path to conversion configuration" ).tag(config=True) plugin_folder_paths = T.Set( T.Unicode(), default_value=(), help="a list of folders containing conversion configurations", ).tag(config=True) @validate("plugin_folder_paths") def _validate_plugin_folder_paths(self, proposal): folder_paths = proposal["value"] for path in folder_paths: if not os.path.exists(path): raise TraitError( "the configuration folder path does not exist: " "{}".format(path) ) return proposal["value"] outpath = T.Union( [T.Unicode(), T.Instance(pathlib.Path)], allow_none=True, default_value=None, help="path to output converted files", ).tag(config=True) folder_suffix = T.Unicode( "_files", help=( "suffix for the folder name where content will be dumped " "(e.g. internal images). " "It will be a sanitized version of the input filename, " "followed by the suffix" ), ).tag(config=True) ignore_prefix = T.Unicode( "_", help=("prefixes to ignore, " "when finding notebooks to merge") ).tag(config=True) meta_path_placeholder = T.Unicode( "${meta_path}", help=( "all string values in the export configuration containing " "this placeholder will be be replaced with the path to the " "notebook from which the metadata was obtained" ), ).tag(config=True) files_folder_placeholder = T.Unicode( "${files_path}", help=( "all string values in the export configuration containing " "this placeholder will be be replaced with the path " "(relative to outpath) to the folder where files will be dumped" ), ).tag(config=True) validate_nb_metadata = T.Bool( True, help=( "before running the exporter, validate that " "the notebook level metadata is valid again the schema" ), ).tag(config=True) pre_conversion_funcs = T.Dict( help=( "a mapping of file extensions to functions that can convert" "that file type Instance(nbformat.NotebookNode) = func(pathstr)" ) ).tag(config=True) @default("pre_conversion_funcs") def _default_pre_conversion_funcs(self): try: import jupytext # noqa: F401 except ImportError: return {} try: from jupytext import read except ImportError: # this is deprecated in newer versions from jupytext import readf as read # noqa: F401 return {".Rmd": read, ".md": read} @validate("pre_conversion_funcs") def _validate_pre_conversion_funcs(self, proposal): for ext, func in proposal["value"].items(): if not ext.startswith("."): raise TraitError( "the extension key should start with a '.': " "{}".format(ext) ) try: func("string") # TODO should do this safely with inspect, # but no obvious solution # to check if it only requires one string argument except TypeError: raise TraitError( "the function for {} can not be " "called with a single string arg: " "{}".format(ext, func) ) except Exception: pass return proposal["value"] log_to_stdout = T.Bool(True, help="whether to log to sys.stdout").tag(config=True) log_level_stdout = T.Enum( ["debug", "info", "warning", "error", "DEBUG", "INFO", "WARNING", "ERROR"], default_value="INFO", help="the logging level to output to stdout", ).tag(config=True) log_stdout_formatstr = T.Unicode("%(levelname)s:%(name)s:%(message)s").tag( config=True ) log_to_file = T.Bool(False, help="whether to log to file").tag(config=True) log_level_file = T.Enum( ["debug", "info", "warning", "error", "DEBUG", "INFO", "WARNING", "ERROR"], default_value="INFO", help="the logging level to output to file", ).tag(config=True) log_file_path = T.Unicode( None, allow_none=True, help="if None, will output to {outdir}/{ipynb_name}.nbpub.log", ).tag(config=True) log_file_formatstr = T.Unicode("%(levelname)s:%(name)s:%(message)s").tag( config=True ) default_ppconfig_kwargs = T.Dict( trait=T.Bool(), default_value=( ("pdf_in_temp", False), ("pdf_debug", False), ("launch_browser", False), ), help=( "convenience arguments for constructing the post-processors " "default configuration" ), ).tag(config=True) default_pporder_kwargs = T.Dict( trait=T.Bool(), default_value=( ("dry_run", False), ("clear_existing", False), ("dump_files", False), ("create_pdf", False), ("serve_html", False), ("slides", False), ), help=( "convenience arguments for constructing the post-processors " "default list" ), ).tag(config=True) # TODO validate that default_ppconfig/pporder_kwargs can be parsed to funcs default_exporter_config = T.Dict(help="default configuration for exporters").tag( config=True ) @default("default_exporter_config") def _default_exporter_config(self): temp = "${files_path}/{unique_key}_{cell_index}_{index}{extension}" return {"ExtractOutputPreprocessor": {"output_filename_template": temp}} def _create_default_ppconfig( self, pdf_in_temp=False, pdf_debug=False, launch_browser=False ): """create a default config for postprocessors""" return Config( { "PDFExport": { "files_folder": "${files_path}", "convert_in_temp": pdf_in_temp, "debug_mode": pdf_debug, "open_in_browser": launch_browser, "skip_mime": False, }, "RunSphinx": {"open_in_browser": launch_browser}, "RemoveFolder": {"files_folder": "${files_path}"}, "CopyResourcePaths": {"files_folder": "${files_path}"}, "ConvertBibGloss": {"files_folder": "${files_path}"}, } ) def _create_default_pporder( self, dry_run=False, clear_existing=False, dump_files=False, create_pdf=False, serve_html=False, slides=False, ): """create a default list of postprocessors to run""" default_pprocs = [ "remove-blank-lines", "remove-trailing-space", "filter-output-files", ] if slides: default_pprocs.append("fix-slide-refs") if not dry_run: if clear_existing: default_pprocs.append("remove-folder") default_pprocs.append("write-text-file") if dump_files or create_pdf or serve_html: default_pprocs.extend( ["write-resource-files", "copy-resource-paths", "convert-bibgloss"] ) if create_pdf: default_pprocs.append("pdf-export") elif serve_html: default_pprocs.append("reveal-server") return default_pprocs @property def logger(self): return logging.getLogger("ipypublish") @contextmanager def _log_handlers(self, ipynb_name, outdir): root = logging.getLogger() root_level = root.level log_handlers = [] try: root.setLevel(logging.DEBUG) if self.log_to_stdout: # setup logging to terminal slogger = logging.StreamHandler(sys.stdout) slogger.setLevel(getattr(logging, self.log_level_stdout.upper())) formatter = logging.Formatter(self.log_stdout_formatstr) slogger.setFormatter(formatter) slogger.propogate = False root.addHandler(slogger) log_handlers.append(slogger) if self.log_to_file: # setup logging to file if self.log_file_path: path = self.log_file_path else: path = os.path.join(outdir, ipynb_name + ".nbpub.log") if not os.path.exists(os.path.dirname(path)): os.makedirs(os.path.dirname(path)) flogger = logging.FileHandler(path, "w") flogger.setLevel(getattr(logging, self.log_level_file.upper())) formatter = logging.Formatter(self.log_file_formatstr) flogger.setFormatter(formatter) flogger.propogate = False root.addHandler(flogger) log_handlers.append(flogger) yield finally: root.setLevel(root_level) for handler in log_handlers: handler.close() root.removeHandler(handler) def __init__(self, config=None): """ Public constructor Parameters ---------- config: traitlets.config.Config User configuration instance. """ # with_default_config = self.default_config # if config: # with_default_config.merge(config) if config is None: config = {} if not isinstance(config, Config): config = Config(config) with_default_config = config super(IpyPubMain, self).__init__(config=with_default_config) def __call__(self, ipynb_path, nb_node=None): """see IpyPubMain.publish""" return self.publish(ipynb_path, nb_node)
[docs] def publish(self, ipynb_path, nb_node=None): """ convert one or more Jupyter notebooks to a published format paths can be string of an existing file or folder, or a pathlib.Path like object all files linked in the documents are placed into a single files_folder Parameters ---------- ipynb_path: str or pathlib.Path notebook file or directory nb_node: None or nbformat.NotebookNode a pre-converted notebook Returns -------- outdata: dict containing keys; "outpath", "exporter", "stream", "main_filepath", "resources" """ # setup the input and output paths if isinstance(ipynb_path, string_types): ipynb_path = pathlib.Path(ipynb_path) ipynb_name, ipynb_ext = os.path.splitext(ipynb_path.name) outdir = ( os.path.join(os.getcwd(), "converted") if self.outpath is None else str(self.outpath) ) with self._log_handlers(ipynb_name, outdir): if not ipynb_path.exists() and not nb_node: handle_error( "the notebook path does not exist: {}".format(ipynb_path), IOError, self.logger, ) # log start of conversion self.logger.info( "started ipypublish v{0} at {1}".format( ipypublish.__version__, time.strftime("%c") ) ) self.logger.info( "logging to: {}".format(os.path.join(outdir, ipynb_name + ".nbpub.log")) ) self.logger.info("running for ipynb(s) at: {0}".format(ipynb_path)) self.logger.info( "with conversion configuration: {0}".format(self.conversion) ) if nb_node is None and ipynb_ext in self.pre_conversion_funcs: func = self.pre_conversion_funcs[ipynb_ext] self.logger.info( "running pre-conversion with: {}".format(inspect.getmodule(func)) ) try: nb_node = func(ipynb_path) except Exception as err: handle_error( "pre-conversion failed for {}: {}".format(ipynb_path, err), err, self.logger, ) # doesn't work with folders # if (ipynb_ext != ".ipynb" and nb_node is None): # handle_error( # 'the file extension is not associated with any ' # 'pre-converter: {}'.format(ipynb_ext), # TypeError, self.logger) if nb_node is None: # merge all notebooks # TODO allow notebooks to remain separate # (would require creating a main.tex with the preamble in etc ) # Could make everything a 'PyProcess', # with support for multiple streams final_nb, meta_path = merge_notebooks( ipynb_path, ignore_prefix=self.ignore_prefix ) else: final_nb, meta_path = (nb_node, ipynb_path) # validate the notebook metadata against the schema if self.validate_nb_metadata: nb_metadata_schema = read_file_from_directory( get_module_path(schema), "doc_metadata.schema.json", "doc_metadata.schema", self.logger, interp_ext=True, ) try: jsonschema.validate(final_nb.metadata, nb_metadata_schema) except jsonschema.ValidationError as err: handle_error( "validation of notebook level metadata failed: {}\n" "see the doc_metadata.schema.json for full spec".format( err.message ), jsonschema.ValidationError, logger=self.logger, ) # set text replacements for export configuration replacements = { self.meta_path_placeholder: str(meta_path), self.files_folder_placeholder: "{}{}".format( get_valid_filename(ipynb_name), self.folder_suffix ), } self.logger.debug("notebooks meta path: {}".format(meta_path)) # load configuration file ( exporter_cls, jinja_template, econfig, pprocs, pconfig, ) = self._load_config_file(replacements) # run nbconvert self.logger.info("running nbconvert") exporter, stream, resources = self.export_notebook( final_nb, exporter_cls, econfig, jinja_template ) # postprocess results main_filepath = os.path.join(outdir, ipynb_name + exporter.file_extension) for post_proc_name in pprocs: proc_class = find_entry_point( post_proc_name, "ipypublish.postprocessors", self.logger, "ipypublish", ) proc = proc_class(pconfig) stream, main_filepath, resources = proc.postprocess( stream, exporter.output_mimetype, main_filepath, resources ) self.logger.info("process finished successfully") return { "outpath": outdir, "exporter": exporter, "stream": stream, "main_filepath": main_filepath, "resources": resources, }
def _load_config_file(self, replacements): # find conversion configuration self.logger.info("finding conversion configuration: {}".format(self.conversion)) export_config_path = None if isinstance(self.conversion, string_types): outformat_path = pathlib.Path(self.conversion) else: outformat_path = self.conversion if outformat_path.exists(): # TODO use pathlib approach # if is outformat is a path that exists, use that export_config_path = outformat_path else: # else search internally export_config_path = get_export_config_path( self.conversion, self.plugin_folder_paths ) if export_config_path is None: handle_error( "could not find conversion configuration: {}".format(self.conversion), IOError, self.logger, ) # read conversion configuration and create self.logger.info("loading conversion configuration") data = load_export_config(export_config_path) self.logger.info("creating exporter") exporter_cls = create_exporter_cls(data["exporter"]["class"]) self.logger.info("creating template and loading filters") template_name = "template_file" jinja_template = load_template(template_name, data["template"]) self.logger.info("creating process configuration") export_config = self._create_export_config( data["exporter"], template_name, replacements ) pprocs, pproc_config = self._create_pproc_config( data.get("postprocessors", {}), replacements ) return (exporter_cls, jinja_template, export_config, pprocs, pproc_config) def _create_export_config(self, exporter_data, template_name, replacements): # type: (dict, Dict[str, str]) -> Config config = {} exporter_name = exporter_data["class"].split(".")[-1] config[exporter_name + ".template_file"] = template_name config[exporter_name + ".filters"] = exporter_data.get("filters", []) preprocessors = [] for preproc in exporter_data.get("preprocessors", []): preprocessors.append(preproc["class"]) preproc_name = preproc["class"].split(".")[-1] for name, val in preproc.get("args", {}).items(): config[preproc_name + "." + name] = val config[exporter_name + ".preprocessors"] = preprocessors for name, val in exporter_data.get("other_args", {}).items(): config[name] = val final_config = self.default_exporter_config final_config.update(config) replace_placeholders(final_config, replacements) return dict_to_config(final_config, True) def _create_pproc_config(self, pproc_data, replacements): if "order" in pproc_data: pprocs_list = pproc_data["order"] else: pprocs_list = self._create_default_pporder(**self.default_pporder_kwargs) pproc_config = self._create_default_ppconfig(**self.default_ppconfig_kwargs) if "config" in pproc_data: override_config = pproc_data["config"] pproc_config.update(override_config) replace_placeholders(pproc_config, replacements) return pprocs_list, pproc_config
[docs] def export_notebook(self, final_nb, exporter_cls, config, jinja_template): kwargs = {"config": config} if jinja_template is not None: kwargs["extra_loaders"] = [jinja_template] try: exporter = exporter_cls(**kwargs) except TypeError: self.logger.warning( "the exporter class can not be parsed " "the arguments: {}".format(list(kwargs.keys())) ) exporter = exporter_cls() body, resources = exporter.from_notebook_node(final_nb) return exporter, body, resources
[docs]def replace_placeholders(mapping, replacements): """ recurse through a mapping and perform (in-place) string replacements Parameters ---------- mapping: any object which has an items() attribute replacements: dict {placeholder: replacement} """ for key, val in mapping.items(): if isinstance(val, string_types): for instr, outstr in replacements.items(): val = val.replace(instr, outstr) mapping[key] = val elif hasattr(val, "items"): replace_placeholders(val, replacements)