import re import sys # from time import sleep from chart import Chart from symbols import Symbol from log import log, set_log_level, DEBUG from rules import Rules from vertical import XmlTag class ChartParser: def __init__(self, rules=None, rule_files=['default'], argv=[]): self.options = dict( debug=False, # print under the hood information chart=False, edges=False, # show each edge as it is handled and combined # original only, no dependencies yet (--dependency/-d) vertical=False, # TODO: accept file names? phrasal_trees=False, # constituency_trees dot=False, # trees in the DOT format vertical_phrases=False, # TODO: accept file names? interactive=False, # stop after each processed sentence ) argv_rule_files = [] expect_rules = False for arg in argv[1:]: if arg == '--rules': expect_rules = True elif expect_rules: argv_rule_files.append(arg) expect_rules = False elif arg == '--debug': self.options['debug'] = True elif arg == '--edges': self.options['edges'] = True elif arg == '--vertical': self.options['vertical'] = True elif arg == '--chart': self.options['chart'] = True elif arg == '--trees': self.options['phrasal_trees'] = True elif arg == '--dot': self.options['dot'] = True elif arg.startswith('--vertical-phrases'): self.options['vertical_phrases'] = arg[19:] or True self.options['vertical'] = True # elif arg == '--interactive': # self.options['interactive'] = True else: log.warning('Unknown parameter: ' + arg) if self.options['debug']: set_log_level(DEBUG) if argv_rule_files or rule_files: self.rules = Rules(files=argv_rule_files or rule_files) elif rules: self.rules = Rules(rules) self.rules.print() def prepare(self, tokens, root): self.tokens = tokens for begin, token in enumerate(tokens): token.begin = begin # if self.options['debug']: log.info(Symbol(token)) self.root = root # když mezi nima není sloveso nebo „lze“ k6, spustit parse(root='NP') # teď to není potřeba, protože CLAUSE predikuje i NP # ale bude se to hodit na vernacular a optional_punctuation if False: log.info('No verb found, parsing as NP instead') def parse(self, tokens, root='CLAUSE'): self.prepare(tokens, root) length = len(self.tokens) self.prepare_agenda(length) self.chart = Chart(length) while self.agenda: agenda_edge = self.agenda.popleft() for new_edge in self.propose_edges(agenda_edge): self.agenda.append(new_edge) self.chart.append(agenda_edge) def parse_and_evaluate(self, tokens, root='CLAUSE', opening_tag=None, closing_tag=None): self.parse(tokens, root) length = len(self.tokens) if self.options['chart']: self.chart.print() successful_edges = [ edge for edge in self.chart.closed_edges[0] if edge.end == length ] if not successful_edges: log.error('\nNo edge covering the whole input!\n') # TODO: vytisknout nejdelší hrany (nejzazší edge.end) # for edges in reversed(self.chart.open_edges): # if edges: # self.partial_edges = list(edges) # break # else: # self.partial_edges = [] # for edge in self.partial_edges: # log.info('Partial: %s', edge) if opening_tag: opening_tag['passed'] = 'yes' if successful_edges else 'no' for index, edge in enumerate(successful_edges, 1): log.info('\nPassed! %s\n', edge) if self.options['phrasal_trees']: self.print_graph(edge) log.info('\n') if self.options['dot']: self.print_dot(edge) if self.options['vertical_phrases']: # CLAUSE, ATTR_CLAUSE?, DEP_CLAUSE if opening_tag: opening_tag['analysis'] = str(index) # TODO: kontrolovat duplikáty (vypisují se, když se nezobrazují # všechny fráze) self.print_vertical_phrases(edge, opening_tag, closing_tag, self.options['vertical_phrases']) if not successful_edges or (self.options['vertical'] and not self.options['vertical_phrases']): self.print_plain_vertical(opening_tag, closing_tag) if opening_tag and 'seq' in opening_tag: self.successful_edges[opening_tag['seq']] = successful_edges # else # self.successful_edges[None] = successful_edges # if self.interactive: # input() def parse_from_vertical(self, lines=sys.stdin, root='CLAUSE'): tokens = [] opening_tag = None phrase = None seq = 1 self.successful_edges = {} for line in lines: line = line.strip() if line.startswith('<'): tag = XmlTag.parse(line) if tag.name in ('s', 'head'): closing_tag = tag if tag.closing else None if closing_tag or tokens: self.parse_and_evaluate(tokens, root, opening_tag, closing_tag) seq += 1 tokens = [] # sleep(0.1) opening_tag = tag if tag.opening else None if tag.opening: opening_tag['seq'] = seq continue if tag.name == 'g': # and tag.empty if tokens: # the next token is “glued” to this one tokens[-1].trailing_whitespace = False elif tag.name == 'phr': # MWE if tag.opening: phrase = Symbol(word=[], lemma=tag['l'], tag=tag['t'], mwe=True) elif phrase: phrase['word'] = ' '.join(phrase['word']) tokens.append(phrase) phrase = None else: log.warning('unexpected !') elif tag.name == 'doc': if tokens: self.parse_and_evaluate(tokens, root, opening_tag) seq += 1 tokens = [] if self.options['vertical']: print(tag, flush=True) elif tag.name == 'p': if tokens: surrogate_opening_tag = (tag if not opening_tag and tag.opening else opening_tag) surrogate_opening_tag['seq'] = seq self.parse_and_evaluate(tokens, root, surrogate_opening_tag) seq += 1 tokens = [] elif tag.closing and self.options['vertical']: print(tag, flush=True) elif tag.name == 'table': # přeskakovat někdy celý obsah? # v DESAMu source.130620.294.utf8 je už jen jednou pass # table = True elif tag.name == 'sign': pass # zajímavé: Josef Juřeník, přednosta stanice # neboli přístavek elif line: # TODO: phrase je Symbol, je nutné is not None? if phrase is not None: phrase['word'].append(line) else: try: word, lemma, tag, *extra = line.split() except ValueError: word = lemma = line if line in ',.():?': tag = 'kI' # cesta do pekel :-D # TODO: vylepšit heuristiku elif line[0] in '0123456789': log.warning('line with a number, correct? ' + line) tag = 'k4' else: # 31. března log.warning('unexpected line ' + line) tag = 'k?' token = Symbol(word=word, lemma=lemma, tag=tag, line=line, length=1) # TODO: Token si length nastaví tokens.append(token) if tokens: self.parse_and_evaluate(tokens, root, opening_tag) def print_graph(self, edge, level=0): log.info('{0}{1}'.format(' '*level, edge)) level += 1 for child in edge: if isinstance(child.edge, Symbol): # TODO: Token if len(edge) > 1: log.info('{0}{1}'.format(' '*level, child)) elif child.edge and child.length: self.print_graph(child.edge, level) def print_dot(self, edge, parent_edge=None): if parent_edge is None: print('graph {', flush=True) phrase_type = edge.left.get('type') or edge.left.get('coord_type') label = edge.left.phrase + (' ' + phrase_type if phrase_type and phrase_type is not True else '') print(' "{:d}" [label="{}"];'.format(edge.left, label), flush=True) for index, child in enumerate(edge): if isinstance(child.edge, Symbol): # TODO: Token print(' "{:d}" [label="{}"];'.format( child, child.get('word', 'N/A')), flush=True) elif child.edge and child.length: self.print_dot(child.edge, edge) else: continue print(' "{:d}" -- "{:d}";'.format(edge.left, child), flush=True) if parent_edge is None: print('}', flush=True) def print_plain_vertical(self, opening_tag, closing_tag): if opening_tag: print(opening_tag, flush=True) for token in self.tokens: print(token.vertical(), flush=True) if closing_tag: print(closing_tag, flush=True) def print_vertical_phrases(self, edge, opening_tag=None, closing_tag=None, phrases=True, level=1): if opening_tag: print(opening_tag, flush=True) phrase = edge.left.phrase.lower() if phrases is True or re.match(phrases, phrase): phrase_opening_tag = XmlTag( name=phrase, indentation_level=level, attributes=( (attr, value) for (attr, value) in edge.left.items() if value is not True)) phrase_closing_tag = XmlTag( name=phrase, indentation_level=level, closing=True) print(phrase_opening_tag, flush=True) level += 1 # úroveň zanoření se zvyšuje dle potřeby, ne paušálně else: phrase_closing_tag = None for child in edge: if isinstance(child.edge, Symbol): # TODO: Token if len(edge) >= 1: print(child.vertical(), flush=True) elif child.edge and child.length: self.print_vertical_phrases(child.edge, phrases=phrases, level=level) if phrase_closing_tag: print(phrase_closing_tag, flush=True) if closing_tag: print(closing_tag, flush=True)