A Discrete-Event Network Simulator
API
Loading...
Searching...
No Matches
ipython_view.py
Go to the documentation of this file.
1"""
2Backend to the console plugin.
3
4@author: Eitan Isaacson
5@organization: IBM Corporation
6@copyright: Copyright (c) 2007 IBM Corporation
7@license: BSD
8
9All rights reserved. This program and the accompanying materials are made
10available under the terms of the BSD which accompanies this distribution, and
11is available at U{http://www.opensource.org/licenses/bsd-license.php}
12"""
13
14# this file is a modified version of source code from the Accerciser project
15# https://wiki.gnome.org/Apps/Accerciser
16
17import gi
18
19gi.require_version("Gdk", "3.0")
20gi.require_version("Gtk", "3.0")
21import os
22import re
23import sys
24from functools import reduce
25from io import StringIO
26
27from gi.repository import Gdk, GLib, Gtk, Pango
28from packaging.version import Version
29
30## Try to import IPython
31try:
32 import IPython
33except ImportError:
34 ##@ var IPython
35 #
36 IPython = None
37
38
39## IterableIPShell class
41 ## @var IP
42 # IP
43 ## @var iter_more
44 # iterate more
45 ## @var history_level
46 # history level
47 ## @var complete_sep
48 # separators
49 ## @var no_input_splitter
50 # no input splitter
51 ## @var lines
52 # lines
53 ## @var indent_spaces
54 # indent spaces
55 ## @var prompt
56 # prompt
57 ## @var header
58 # header
59 ## @var config
60 # config
61 ## @var colors
62 # colors
63 ## @var raw_input
64 # raw input
65
67 self,
68 argv=[],
69 user_ns=None,
70 user_global_ns=None,
71 cin=None,
72 cout=None,
73 cerr=None,
74 input_func=None,
75 ):
76 """!
77 Constructor for the IterableIPShell class
78 @param self: this object
79 @param argv: Command line options for IPython
80 @param user_ns: User namespace.
81 @param user_global_ns: User global namespace.
82 @param cin: Console standard input.
83 @param cout: Console standard output.
84 @param cerr: Console standard error.
85 @param input_func: Replacement for builtin raw_input()
86 """
87 io = IPython.utils.io
88 if input_func:
89 IPython.terminal.interactiveshell.raw_input_original = input_func
90 if IPython.version_info < (8,):
91 if cin:
92 io.stdin = io.IOStream(cin)
93 if cout:
94 io.stdout = io.IOStream(cout)
95 if cerr:
96 io.stderr = io.IOStream(cerr)
97 else:
98 if cin:
99 sys.stdin = cin
100 if cout:
101 sys.stdout = cout
102 if cerr:
103 sys.stderr = cerr
104
105 # This is to get rid of the blockage that occurs during
106 # IPython.Shell.InteractiveShell.user_setup()
107 io.raw_input = lambda x: None
108
109 os.environ["TERM"] = "dumb"
110 excepthook = sys.excepthook
111
112 from traitlets.config.loader import Config
113
114 cfg = Config()
115 cfg.InteractiveShell.colors = "Linux"
116 cfg.Completer.use_jedi = False
117
118 if IPython.version_info < (8,):
119 # InteractiveShell's __init__ overwrites io.stdout,io.stderr with
120 # sys.stdout, sys.stderr, this makes sure they are right
121 old_stdout, old_stderr = sys.stdout, sys.stderr
122 sys.stdout, sys.stderr = io.stdout.stream, io.stderr.stream
123
124 # InteractiveShell inherits from SingletonConfigurable, so use instance()
125 #
126 self.IP = IPython.terminal.embed.InteractiveShellEmbed.instance(config=cfg, user_ns=user_ns)
127
128 if IPython.version_info < (8,):
129 sys.stdout, sys.stderr = old_stdout, old_stderr
130
131 self.IP.system = lambda cmd: self.shell(
132 self.IP.var_expand(cmd), header="IPython system call: "
133 )
134 # local_ns=user_ns)
135 # global_ns=user_global_ns)
136 # verbose=self.IP.rc.system_verbose)
137
138 self.IP.raw_input = input_func
139 sys.excepthook = excepthook
140 self.iter_more = 0
142 self.complete_sep = re.compile(r"[\s\{\}\[\]\‍(\‍)]")
143 self.updateNamespace({"exit": lambda: None})
144 self.updateNamespace({"quit": lambda: None})
145 # Workaround for updating namespace with sys.modules
146 #
147 self.__update_namespace()
148
149 # Avoid using input splitter when not really needed.
150 # Perhaps it could work even before 5.8.0
151 # But it definitely does not work any more with >= 7.0.0
152 self.no_input_splitter = Version(IPython.release.version) >= Version("5.8.0")
153 self.lines = []
155
157 """!
158 Update self.IP namespace for autocompletion with sys.modules
159 @return none
160 """
161 for k, v in list(sys.modules.items()):
162 if not "." in k:
163 self.IP.user_ns.update({k: v})
164
165 def execute(self):
166 """!
167 Executes the current line provided by the shell object.
168 @return none
169 """
170 self.history_level = 0
171
172 if IPython.version_info < (8,):
173 # this is needed because some functions in IPython use 'print' to print
174 # output (like 'who')
175
176 orig_stdout = sys.stdout
177 sys.stdout = IPython.utils.io.stdout
178
179 orig_stdin = sys.stdin
180 sys.stdin = IPython.utils.io.stdin
181
183
184 self.IP.hooks.pre_prompt_hook()
185 if self.iter_more:
186 try:
187 self.prompt = self.generatePrompt(True)
188 except:
189 self.IP.showtraceback()
190 if self.IP.autoindent:
191 self.IP.rl_do_indent = True
192
193 try:
194 line = self.IP.raw_input(self.prompt)
195 except KeyboardInterrupt:
196 self.IP.write("\nKeyboardInterrupt\n")
197 if self.no_input_splitter:
198 self.lines = []
199 else:
200 self.IP.input_splitter.reset()
201 except:
202 self.IP.showtraceback()
203 else:
204 if self.no_input_splitter:
205 self.lines.append(line)
206 (status, self.indent_spaces) = self.IP.check_complete("\n".join(self.lines))
207 self.iter_more = status == "incomplete"
208 else:
209 self.IP.input_splitter.push(line)
210 self.iter_more = self.IP.input_splitter.push_accepts_more()
211 if not self.iter_more:
212 if self.no_input_splitter:
213 source_raw = "\n".join(self.lines)
214 self.lines = []
215 else:
216 source_raw = self.IP.input_splitter.raw_reset()
217 self.IP.run_cell(source_raw, store_history=True)
218 self.IP.rl_do_indent = False
219 else:
220 # TODO: Auto-indent
221 #
222 self.IP.rl_do_indent = True
223 pass
224 self.prompt = self.generatePrompt(self.iter_more)
225
226 if IPython.version_info < (8,):
227 sys.stdout = orig_stdout
228 sys.stdin = orig_stdin
229
230 def generatePrompt(self, is_continuation):
231 """!
232 Generate prompt depending on is_continuation value
233
234 @param is_continuation
235 @return: The prompt string representation
236
237 """
238
239 if is_continuation:
240 prompt = "... "
241 else:
242 prompt = ">>> "
243 return prompt
244
245 def historyBack(self):
246 """!
247 Provides one history command back.
248
249 @param self this object
250 @return: The command string.
251 """
252 self.history_level -= 1
253 if not self._getHistory():
254 self.history_level += 1
255 return self._getHistory()
256
257 def historyForward(self):
258 """!
259 Provides one history command forward.
260
261 @param self this object
262 @return: The command string.
263 """
264 if self.history_level < 0:
265 self.history_level += 1
266 return self._getHistory()
267
268 def _getHistory(self):
269 """!
270 Gets the command string of the current history level.
271
272 @param self this object
273 @return: Historic command string.
274 """
275 try:
276 rv = self.IP.user_ns["In"][self.history_level].strip("\n")
277 except IndexError:
278 rv = ""
279 return rv
280
281 def updateNamespace(self, ns_dict):
282 """!
283 Add the current dictionary to the shell namespace.
284
285 @param ns_dict: A dictionary of symbol-values.
286 @return none
287 """
288 self.IP.user_ns.update(ns_dict)
289
290 def complete(self, line):
291 """!
292 Returns an auto completed line and/or possibilities for completion.
293
294 @param line: Given line so far.
295 @return: Line completed as for as possible, and possible further completions.
296 """
297 split_line = self.complete_sep.split(line)
298 if split_line[-1]:
299 possibilities = self.IP.complete(split_line[-1])
300 else:
301 completed = line
302 possibilities = ["", []]
303 if possibilities:
304
305 def _commonPrefix(str1, str2):
306 """!
307 Reduction function. returns common prefix of two given strings.
308
309 @param str1: First string.
310 @param str2: Second string
311 @return: Common prefix to both strings.
312 """
313 for i in range(len(str1)):
314 if not str2.startswith(str1[: i + 1]):
315 return str1[:i]
316 return str1
317
318 if possibilities[1]:
319 common_prefix = reduce(_commonPrefix, possibilities[1]) or split_line[-1]
320 completed = line[: -len(split_line[-1])] + common_prefix
321 else:
322 completed = line
323 else:
324 completed = line
325 return completed, possibilities[1]
326
327 def shell(self, cmd, verbose=0, debug=0, header=""):
328 """!
329 Replacement method to allow shell commands without them blocking.
330
331 @param cmd: Shell command to execute.
332 @param verbose: Verbosity
333 @param debug: Debug level
334 @param header: Header to be printed before output
335 @return none
336 """
337 stat = 0
338 if verbose or debug:
339 print(header + cmd)
340 # flush stdout so we don't mangle python's buffering
341 if not debug:
342 input, output = os.popen4(cmd)
343 print(output.read())
344 output.close()
345 input.close()
346
347
348## ConsoleView class
349class ConsoleView(Gtk.TextView):
350 ## @var ANSI_COLORS
351 # color list
352 ## @var text_buffer
353 # text buffer
354 ## @var mark
355 # scroll mark
356 ## @var color_pat
357 # color pattern
358 ## @var line_start
359 # line start
360 ## @var onKeyPress
361 # onKeyPress function
362 ## @var _write
363 # _write function
364 ## @var _showPrompt
365 # _showPrompt function
366 ## @var _changeLine
367 # _changeLine function
368 ## @var _showReturned
369 # _showReturned function
370 ## @var prompt
371 # prompt function
372 ## @var no_input_splitter
373 # no input splitter
374
375 """
376 Specialized text view for console-like workflow.
377
378 @cvar ANSI_COLORS: Mapping of terminal control sequence values to
379 tuples containing foreground and background color names.
380 @type ANSI_COLORS: dictionary
381
382 @ivar text_buffer: Widget's text buffer.
383 @type text_buffer: Gtk.TextBuffer
384 @ivar color_pat: Regex of terminal color pattern
385 @type color_pat: _sre.SRE_Pattern
386 @ivar mark: Scroll mark for automatic scrolling on input.
387 @type mark: Gtk.TextMark
388 @ivar line_start: Start of command line mark.
389 @type line_start: Gtk.TextMark
390 """
391 ANSI_COLORS = {
392 "0;30": ("Black", None),
393 "0;31": ("Red", None),
394 "0;32": ("Green", None),
395 "0;33": ("Brown", None),
396 "0;34": ("Blue", None),
397 "0;35": ("Purple", None),
398 "0;36": ("Cyan", None),
399 "0;37": ("LightGray", None),
400 "1;30": ("DarkGray", None),
401 "1;31": ("DarkRed", None),
402 "1;32": ("SeaGreen", None),
403 "1;33": ("Yellow", None),
404 "1;34": ("LightBlue", None),
405 "1;35": ("MediumPurple", None),
406 "1;36": ("LightCyan", None),
407 "1;37": ("White", None),
408 "38;5;124;43": ("DarkRed", "Yellow"),
409 "38;5;241": ("Gray", None),
410 "38;5;241;43": ("Gray", "Yellow"),
411 "39": ("Black", None),
412 "39;49": ("Red", "White"),
413 "43": (None, "Yellow"),
414 "49": (None, "White"),
415 }
416
417 def __init__(self):
418 """
419 Initialize console view.
420 """
421 Gtk.TextView.__init__(self)
422 self.set_monospace(True)
423 self.set_cursor_visible(True)
424 self.text_buffer = self.get_buffer()
425 self.mark = self.text_buffer.create_mark(
426 "scroll_mark", self.text_buffer.get_end_iter(), False
427 )
428 for code in self.ANSI_COLORS:
429 self.text_buffer.create_tag(
430 code,
431 foreground=self.ANSI_COLORS[code][0],
432 background=self.ANSI_COLORS[code][1],
433 weight=700,
434 )
435 self.text_buffer.create_tag("0")
436 self.text_buffer.create_tag("notouch", editable=False)
437 self.color_pat = re.compile(r"\x01?\x1b\[(.*?)m\x02?")
438 self.line_start = self.text_buffer.create_mark(
439 "line_start", self.text_buffer.get_end_iter(), True
440 )
441 self.connect("key-press-event", self.onKeyPress)
442
443 def write(self, text, editable=False):
444 """!
445 Write given text to buffer.
446
447 @param text: Text to append.
448 @param editable: If true, added text is editable.
449 @return none
450 """
451 GLib.idle_add(self._write, text, editable)
452
453 def _write(self, text, editable=False):
454 """!
455 Write given text to buffer.
456
457 @param text: Text to append.
458 @param editable: If true, added text is editable.
459 @return none
460 """
461 segments = self.color_pat.split(text)
462 segment = segments.pop(0)
463 start_mark = self.text_buffer.create_mark(None, self.text_buffer.get_end_iter(), True)
464 self.text_buffer.insert(self.text_buffer.get_end_iter(), segment)
465
466 if segments:
467 ansi_tags = self.color_pat.findall(text)
468 for tag in ansi_tags:
469 i = segments.index(tag)
470 self.text_buffer.insert_with_tags_by_name(
471 self.text_buffer.get_end_iter(), segments[i + 1], str(tag)
472 )
473 segments.pop(i)
474 if not editable:
475 self.text_buffer.apply_tag_by_name(
476 "notouch",
477 self.text_buffer.get_iter_at_mark(start_mark),
478 self.text_buffer.get_end_iter(),
479 )
480 self.text_buffer.delete_mark(start_mark)
481 self.scroll_mark_onscreen(self.mark)
482
483 def showPrompt(self, prompt):
484 """!
485 Prints prompt at start of line.
486
487 @param prompt: Prompt to print.
488 @return none
489 """
490 GLib.idle_add(self._showPrompt, prompt)
491
492 def _showPrompt(self, prompt):
493 """!
494 Prints prompt at start of line.
495
496 @param prompt: Prompt to print.
497 @return none
498 """
499 self._write(prompt)
500 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
501
502 def changeLine(self, text):
503 """!
504 Replace currently entered command line with given text.
505
506 @param text: Text to use as replacement.
507 @return none
508 """
509 GLib.idle_add(self._changeLine, text)
510
511 def _changeLine(self, text):
512 """!
513 Replace currently entered command line with given text.
514
515 @param text: Text to use as replacement.
516 @return none
517 """
518 iter = self.text_buffer.get_iter_at_mark(self.line_start)
519 iter.forward_to_line_end()
520 self.text_buffer.delete(self.text_buffer.get_iter_at_mark(self.line_start), iter)
521 self._write(text, True)
522
523 def getCurrentLine(self):
524 """!
525 Get text in current command line.
526
527 @return Text of current command line.
528 """
529 rv = self.text_buffer.get_slice(
530 self.text_buffer.get_iter_at_mark(self.line_start),
531 self.text_buffer.get_end_iter(),
532 False,
533 )
534 return rv
535
536 def showReturned(self, text):
537 """!
538 Show returned text from last command and print new prompt.
539
540 @param text: Text to show.
541 @return none
542 """
543 GLib.idle_add(self._showReturned, text)
544
545 def _showReturned(self, text):
546 """!
547 Show returned text from last command and print new prompt.
548
549 @param text: Text to show.
550 @return none
551 """
552 iter = self.text_buffer.get_iter_at_mark(self.line_start)
553 iter.forward_to_line_end()
554 self.text_buffer.apply_tag_by_name(
555 "notouch", self.text_buffer.get_iter_at_mark(self.line_start), iter
556 )
557 self._write("\n" + text)
558 if text:
559 self._write("\n")
561 self.text_buffer.move_mark(self.line_start, self.text_buffer.get_end_iter())
562 self.text_buffer.place_cursor(self.text_buffer.get_end_iter())
563
564 if self.IP.rl_do_indent:
566 indentation = self.indent_spaces
567 else:
568 indentation = self.IP.input_splitter.indent_spaces * " "
569 self.text_buffer.insert_at_cursor(indentation)
570
571 def onKeyPress(self, widget, event):
572 """!
573 Key press callback used for correcting behavior for console-like
574 interfaces. For example 'home' should go to prompt, not to beginning of
575 line.
576
577 @param widget: Widget that key press accored in.
578 @param event: Event object
579 @return Return True if event should not trickle.
580 """
581 insert_mark = self.text_buffer.get_insert()
582 insert_iter = self.text_buffer.get_iter_at_mark(insert_mark)
583 selection_mark = self.text_buffer.get_selection_bound()
584 selection_iter = self.text_buffer.get_iter_at_mark(selection_mark)
585 start_iter = self.text_buffer.get_iter_at_mark(self.line_start)
586 if event.keyval == Gdk.KEY_Home:
587 if (
588 event.state & Gdk.ModifierType.CONTROL_MASK
589 or event.state & Gdk.ModifierType.MOD1_MASK
590 ):
591 pass
592 elif event.state & Gdk.ModifierType.SHIFT_MASK:
593 self.text_buffer.move_mark(insert_mark, start_iter)
594 return True
595 else:
596 self.text_buffer.place_cursor(start_iter)
597 return True
598 elif event.keyval == Gdk.KEY_Left:
599 insert_iter.backward_cursor_position()
600 if not insert_iter.editable(True):
601 return True
602 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [ord("L"), ord("l")]:
603 # clear previous output on Ctrl+L, but remember current input line + cursor position
604 cursor_offset = self.text_buffer.get_property("cursor-position")
605 cursor_pos_in_line = cursor_offset - start_iter.get_offset() + len(self.prompt)
606 current_input = self.text_buffer.get_text(
607 start_iter, self.text_buffer.get_end_iter(), False
608 )
609 self.text_buffer.set_text(self.prompt + current_input)
610 self.text_buffer.move_mark(
611 self.line_start, self.text_buffer.get_iter_at_offset(len(self.prompt))
612 )
613 self.text_buffer.place_cursor(self.text_buffer.get_iter_at_offset(cursor_pos_in_line))
614 return True
615 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval in [Gdk.KEY_k, Gdk.KEY_K]:
616 # clear text after input cursor on Ctrl+K
617 if insert_iter.editable(True):
618 self.text_buffer.delete(insert_iter, self.text_buffer.get_end_iter())
619 return True
620 elif event.state & Gdk.ModifierType.CONTROL_MASK and event.keyval == Gdk.KEY_C:
621 # copy selection on Ctrl+C (upper-case 'C' only)
622 self.text_buffer.copy_clipboard(Gtk.Clipboard.get(Gdk.SELECTION_CLIPBOARD))
623 return True
624 elif not event.string:
625 pass
626 elif start_iter.compare(insert_iter) <= 0 and start_iter.compare(selection_iter) <= 0:
627 pass
628 elif start_iter.compare(insert_iter) > 0 and start_iter.compare(selection_iter) > 0:
629 self.text_buffer.place_cursor(start_iter)
630 elif insert_iter.compare(selection_iter) < 0:
631 self.text_buffer.move_mark(insert_mark, start_iter)
632 elif insert_iter.compare(selection_iter) > 0:
633 self.text_buffer.move_mark(selection_mark, start_iter)
634
635 return self.onKeyPressExtend(event)
636
637 def onKeyPressExtend(self, event):
638 """!
639 For some reason we can't extend onKeyPress directly (bug #500900).
640 @param event key press
641 @return none
642 """
643 pass
644
645
646## IPythonView class
647class IPythonView(ConsoleView, IterableIPShell):
648 ## @var cout
649 # cout
650 ## @var interrupt
651 # interrupt
652 ## @var execute
653 # execute
654 ## @var prompt
655 # prompt
656 ## @var showPrompt
657 # show prompt
658 ## @var history_pos
659 # history list
660 ## @var window
661 # GTK Window
662 """
663 Sub-class of both modified IPython shell and L{ConsoleView} this makes
664 a GTK+ IPython console.
665 """
666
667 def __init__(self):
668 """
669 Initialize. Redirect I/O to console.
670 """
671 ConsoleView.__init__(self)
672 self.cout = StringIO()
673 IterableIPShell.__init__(self, cout=self.cout, cerr=self.cout, input_func=self.raw_input)
674 self.interrupt = False
675 self.execute()
676 self.prompt = self.generatePrompt(False)
677 self.cout.truncate(0)
678 self.showPrompt(self.prompt)
679
680 def raw_input(self, prompt=""):
681 """!
682 Custom raw_input() replacement. Gets current line from console buffer.
683
684 @param prompt: Prompt to print. Here for compatibility as replacement.
685 @return The current command line text.
686 """
687 if self.interrupt:
688 self.interrupt = False
689 raise KeyboardInterrupt
690 return self.getCurrentLine()
691
692 def onKeyPressExtend(self, event):
693 """!
694 Key press callback with plenty of shell goodness, like history,
695 autocompletions, etc.
696
697 @param event: Event object.
698 @return True if event should not trickle.
699 """
700
701 if event.get_state() & Gdk.ModifierType.CONTROL_MASK and event.keyval == 99:
702 self.interrupt = True
703 self._processLine()
704 return True
705 elif event.keyval == Gdk.KEY_Return:
706 self._processLine()
707 return True
708 elif event.keyval == Gdk.KEY_Up:
709 self.changeLine(self.historyBack())
710 return True
711 elif event.keyval == Gdk.KEY_Down:
712 self.changeLine(self.historyForward())
713 return True
714 elif event.keyval == Gdk.KEY_Tab:
715 if not self.getCurrentLine().strip():
716 return False
717 completed, possibilities = self.complete(self.getCurrentLine())
718 if len(possibilities) > 1:
719 slice = self.getCurrentLine()
720 self.write("\n")
721 for symbol in possibilities:
722 self.write(symbol + "\n")
723 self.showPrompt(self.prompt)
724 self.changeLine(completed or slice)
725 return True
726
727 def _processLine(self):
728 """!
729 Process current command line.
730 @return none
731 """
732 self.history_pos = 0
733 self.execute()
734 rv = self.cout.getvalue()
735 if rv:
736 rv = rv.strip("\n")
737 self.showReturned(rv)
738 self.cout.truncate(0)
739 self.cout.seek(0)
740
741
742if __name__ == "__main__":
743 window = Gtk.Window()
744 window.set_default_size(640, 320)
745 window.connect("delete-event", lambda x, y: Gtk.main_quit())
746 window.add(IPythonView())
747 window.show_all()
748 Gtk.main()
write(self, text, editable=False)
Write given text to buffer.
getCurrentLine(self)
Get text in current command line.
onKeyPressExtend(self, event)
For some reason we can't extend onKeyPress directly (bug #500900).
showReturned(self, text)
Show returned text from last command and print new prompt.
showPrompt(self, prompt)
Prints prompt at start of line.
changeLine(self, text)
Replace currently entered command line with given text.
_showReturned
_showReturned function
onKeyPressExtend(self, event)
Key press callback with plenty of shell goodness, like history, autocompletions, etc.
_processLine(self)
Process current command line.
raw_input(self, prompt="")
Custom raw_input() replacement.
historyBack(self)
Provides one history command back.
_getHistory(self)
Gets the command string of the current history level.
historyForward(self)
Provides one history command forward.
complete(self, line)
Returns an auto completed line and/or possibilities for completion.
updateNamespace(self, ns_dict)
Add the current dictionary to the shell namespace.
shell(self, cmd, verbose=0, debug=0, header="")
Replacement method to allow shell commands without them blocking.
__update_namespace(self)
Update self.IP namespace for autocompletion with sys.modules.
execute(self)
Executes the current line provided by the shell object.
__init__(self, argv=[], user_ns=None, user_global_ns=None, cin=None, cout=None, cerr=None, input_func=None)
Constructor for the IterableIPShell class.
generatePrompt(self, is_continuation)
Generate prompt depending on is_continuation value.
#define delete