-
-
Notifications
You must be signed in to change notification settings - Fork 87
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
ecb7162
commit 199cb24
Showing
1 changed file
with
329 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,329 @@ | ||
import re | ||
from copy import copy | ||
from typing import List, Tuple, Callable | ||
|
||
import numpy as np | ||
|
||
from Orange.widgets import gui | ||
from Orange.widgets.settings import DomainContextHandler | ||
from Orange.widgets.utils.concurrent import TaskState, ConcurrentWidgetMixin | ||
from Orange.widgets.widget import OWWidget | ||
from AnyQt.QtCore import QSize | ||
from AnyQt.QtWidgets import QGridLayout, QLabel, QLineEdit, QComboBox | ||
|
||
from orangecontrib.text import Corpus | ||
from orangewidget.settings import Setting, ContextSetting | ||
from orangewidget.utils.signals import Input, Output | ||
from orangewidget.utils.widgetpreview import WidgetPreview | ||
|
||
|
||
# every statistic returns a np.ndarray with statistics | ||
# and list with variables names - it must be implemented here since some | ||
# statistics will have more variables (count of each pos tag) | ||
|
||
|
||
def number_of_words(corpus: Corpus, text: str, callback: Callable): | ||
def cust_len(tokens: List[str]): | ||
callback() | ||
return len(tokens) | ||
# TODO: discuss if ok on tokens | ||
# np.c_ makes column vector (ndarray) out of the list | ||
# [1, 2, 3] -> [[1], [2], [3]] | ||
return np.c_[list(map(cust_len, corpus.tokens))], ["Words count"] | ||
|
||
|
||
def characters_count(corpus: Corpus, text: str, callback: Callable): | ||
def chara_count(tokens: List[str]): | ||
callback() | ||
return sum(len(t) for t in tokens) | ||
# TODO: discuss if ok on tokens | ||
return ( | ||
np.c_[list(map(chara_count, corpus.tokens))], | ||
["Characters count"] | ||
) | ||
|
||
|
||
def starts_with(corpus: Corpus, prefix: str, callback: Callable): | ||
def number_starts_with(tokens: List[str]): | ||
callback() | ||
return sum(t.startswith(prefix) for t in tokens) | ||
return ( | ||
np.c_[list(map(number_starts_with, corpus.tokens))], | ||
[f"Starts with {prefix}"] | ||
) | ||
|
||
|
||
def ends_with(corpus: Corpus, postfix: str, callback: Callable): | ||
def number_ends_with(tokens: List[str]): | ||
callback() | ||
return sum(t.endswith(postfix) for t in tokens) | ||
return ( | ||
np.c_[list(map(number_ends_with, corpus.tokens))], | ||
[f"Ends with {postfix}"] | ||
) | ||
|
||
|
||
def contains(corpus: Corpus, text: str, callback: Callable): | ||
def number_contains(tokens: List[str]): | ||
callback() | ||
return sum(text in t for t in tokens) | ||
return ( | ||
np.c_[list(map(number_contains, corpus.tokens))], | ||
[f"Contains {text}"] | ||
) | ||
|
||
|
||
def regex(corpus: Corpus, expression: str, callback: Callable): | ||
pattern = re.compile(expression) | ||
|
||
def number_regex(tokens: List[str]): | ||
callback() | ||
return sum(bool(pattern.match(t)) for t in tokens) | ||
return ( | ||
np.c_[list(map(number_regex, corpus.tokens))], | ||
[f"Regex {expression}"] | ||
) | ||
|
||
|
||
STATISTICS = [ | ||
# (name of the statistics, function to compute, needs text box) | ||
("Words count", number_of_words, False), | ||
("Characters count", characters_count, False), | ||
("Starts with", starts_with, True), | ||
("Ends with", ends_with, True), | ||
("Contains", contains, True), | ||
("Regex", regex, True) | ||
] | ||
STATISTICS_NAMES = list(list(zip(*STATISTICS))[0]) | ||
STATISTICS_FUNCTIONS = list(list(zip(*STATISTICS))[1]) | ||
STATISTICS_NEEDS_TB = list(list(zip(*STATISTICS))[2]) | ||
|
||
|
||
def run(corpus: Corpus, statistics: Tuple[int, str], state: TaskState) -> None: | ||
""" | ||
This function runs the computation for new features. | ||
All results will be reported as a partial results. | ||
Parameters | ||
---------- | ||
corpus | ||
The corpus on which the computation is held. | ||
statistics | ||
Tuple of statistic pairs to be computed: | ||
(statistics id, string pattern) | ||
state | ||
State used to report progress and partial results. | ||
""" | ||
# callback is called for each corpus element statistics time | ||
tick_values = iter(np.linspace(0, 100, len(corpus) * len(statistics))) | ||
|
||
def advance(): | ||
state.set_progress_value(next(tick_values)) | ||
|
||
for s, patern in statistics: | ||
result = STATISTICS_FUNCTIONS[s](corpus, patern, advance) | ||
state.set_partial_result((s, patern, result)) | ||
|
||
|
||
class OWStatistics(OWWidget, ConcurrentWidgetMixin): | ||
name = "Statistics" | ||
description = "Create new statistic variables for documents." | ||
keywords = [] | ||
|
||
class Inputs: | ||
corpus = Input("Corpus", Corpus) | ||
|
||
class Outputs: | ||
corpus = Output("Corpus", Corpus) | ||
|
||
want_main_area = False | ||
settingsHandler = DomainContextHandler() | ||
|
||
# settings | ||
active_rules: List[Tuple[int, str]] = ContextSetting([(0, ""), (1, "")]) | ||
# rules active at time of apply clicked | ||
applied_rules: Tuple[int, str] = None | ||
autocommit = Setting(True) | ||
|
||
result_dict = {} | ||
|
||
def __init__(self): | ||
OWWidget.__init__(self) | ||
ConcurrentWidgetMixin.__init__(self) | ||
self.corpus = None | ||
|
||
# the list with combos from the widget | ||
self.combos = [] | ||
# the list with line edits from the widget | ||
self.line_edits = [] | ||
# the list of buttons in front of controls that removes them | ||
self.remove_buttons = [] | ||
|
||
self._init_controls() | ||
|
||
def _init_controls(self): | ||
self._init_statistics_box() | ||
box = gui.hBox(self.controlArea) | ||
gui.rubber(box) | ||
gui.button( | ||
box, self, "Apply", | ||
autoDefault=False, | ||
width=180, | ||
callback=self.apply | ||
) | ||
|
||
def _init_statistics_box(self): | ||
patternbox = gui.vBox(self.controlArea, box=True) | ||
self.rules_box = rules_box = QGridLayout() | ||
patternbox.layout().addLayout(self.rules_box) | ||
box = gui.hBox(patternbox) | ||
gui.button( | ||
box, self, "+", callback=self._add_row, autoDefault=False, | ||
flat=True, | ||
minimumSize=(QSize(20, 20))) | ||
gui.rubber(box) | ||
self.rules_box.setColumnMinimumWidth(1, 70) | ||
self.rules_box.setColumnMinimumWidth(0, 10) | ||
self.rules_box.setColumnStretch(0, 1) | ||
self.rules_box.setColumnStretch(1, 1) | ||
self.rules_box.setColumnStretch(2, 100) | ||
rules_box.addWidget(QLabel("Feature"), 0, 1) | ||
rules_box.addWidget(QLabel("Pattern"), 0, 2) | ||
self._update_rules() | ||
|
||
def adjust_n_rule_rows(self): | ||
"""Add or remove lines if needed and fix the tab order.""" | ||
def _add_line(): | ||
n_lines = len(self.combos) + 1 | ||
|
||
# add delete symbol | ||
button = gui.button( | ||
None, self, label='×', flat=True, height=20, | ||
styleSheet='* {font-size: 16pt; color: silver}' | ||
'*:hover {color: black}', | ||
autoDefault=False, callback=self._remove_row) | ||
button.setMinimumSize(QSize(12, 20)) | ||
self.rules_box.addWidget(button, n_lines, 0) | ||
self.remove_buttons.append(button) | ||
|
||
# add statistics type dropdown | ||
combo = QComboBox() | ||
combo.addItems(STATISTICS_NAMES) | ||
combo.currentIndexChanged.connect(self._sync_edit_combo) | ||
self.rules_box.addWidget(combo, n_lines, 1) | ||
self.combos.append(combo) | ||
|
||
# add line edit for patern | ||
line_edit = QLineEdit() | ||
self.rules_box.addWidget(line_edit, n_lines, 2) | ||
line_edit.textChanged.connect(self._sync_edit_line) | ||
self.line_edits.append(line_edit) | ||
|
||
def _remove_line(): | ||
self.combos.pop().deleteLater() | ||
self.line_edits.pop().deleteLater() | ||
self.remove_buttons.pop().deleteLater() | ||
|
||
def _fix_tab_order(): | ||
# TODO: write it differently - check create class | ||
for i, (r, c, l) in enumerate( | ||
zip(self.active_rules, self.combos, self.line_edits) | ||
): | ||
c.setCurrentIndex(r[0]) # update combo | ||
l.setText(r[1]) # update line edit | ||
if STATISTICS_NEEDS_TB[r[0]]: | ||
l.setVisible(True) | ||
else: | ||
l.setVisible(False) | ||
|
||
n = len(self.active_rules) | ||
while n > len(self.combos): | ||
_add_line() | ||
while len(self.combos) > n: | ||
_remove_line() | ||
_fix_tab_order() | ||
|
||
def _update_rules(self): | ||
self.adjust_n_rule_rows() | ||
|
||
def _add_row(self): | ||
self.active_rules.append((0, "")) | ||
self.adjust_n_rule_rows() | ||
|
||
def _remove_row(self): | ||
remove_idx = self.remove_buttons.index(self.sender()) | ||
del self.active_rules[remove_idx] | ||
self.adjust_n_rule_rows() | ||
|
||
def _sync_edit_combo(self): | ||
combo = self.sender() | ||
edit_index = self.combos.index(combo) | ||
self.active_rules[edit_index] = ( | ||
combo.currentIndex(), self.active_rules[edit_index][1] | ||
) | ||
self.adjust_n_rule_rows() | ||
|
||
def _sync_edit_line(self): | ||
line_edit = self.sender() | ||
edit_index = self.line_edits.index(line_edit) | ||
self.active_rules[edit_index] = ( | ||
self.active_rules[edit_index][0], line_edit.text() | ||
) | ||
|
||
@Inputs.corpus | ||
def set_data(self, corpus): | ||
self.corpus = corpus | ||
self.result_dict = {} # empty computational results when new data | ||
|
||
def apply(self): | ||
""" | ||
This function is called when user click apply button. It starts | ||
the computation. When computation is finished results are shown | ||
on the output - on_done. | ||
""" | ||
self.applied_rules = copy(self.active_rules) | ||
self.cancel() # cancel task since user clicked apply again | ||
rules_to_compute = [ | ||
r for r in self.active_rules if r not in self.result_dict | ||
] | ||
self.start(run, self.corpus, rules_to_compute) | ||
|
||
def on_exception(self, exception) -> None: | ||
raise exception | ||
|
||
def on_partial_result(self, result: Tuple[int, str, Tuple[np.ndarray, str]]): | ||
statistic, patern, result = result | ||
self.result_dict[(statistic, patern)] = result | ||
|
||
def on_done(self, result: None): | ||
# join results | ||
self.output_results() | ||
|
||
# remove unnecessary results from dict - it can happen that user | ||
# already removes the statistic from gui but it is still computed | ||
for k in self.result_dict.keys(): | ||
if k not in self.active_rules: | ||
del self.result_dict[k] | ||
|
||
def output_results(self): | ||
to_stack = [self.corpus.X] | ||
attributes = [] | ||
for rule in self.applied_rules: | ||
# check for safety reasons - in practice should not happen | ||
if rule in self.result_dict: | ||
data, variables = self.result_dict[rule] | ||
to_stack.append(data) | ||
attributes += variables | ||
# here we will use extend_attributes function - this function add | ||
# attributes to existing corpus so it must be copied first | ||
# TODO: when change of preprocessing is finished change this function | ||
# to have inplace parameter which is False by default, | ||
# also I would prefer extend_attriubtes where you give variables | ||
# instead of strings on input | ||
new_corpus = self.corpus.copy() | ||
new_corpus.extend_attributes(np.hstack(to_stack), attributes) | ||
self.Outputs.corpus.send(new_corpus) | ||
|
||
|
||
if __name__ == "__main__": | ||
WidgetPreview(OWStatistics).run(Corpus.from_file('book-excerpts')) |