Source code for sugaroid.sugaroid

import shutil
import time

from colorama import Fore, Style
from colorama import init as colorama_init
from emoji import emojize

from sugaroid.backend.sql import SqlDatabaseManagement
from sugaroid.brain.constants import SUGAROID_INTRO
from sugaroid.config.config import ConfigManager
from sugaroid.brain.brain import Neuron
from import SugaroidBot
from sugaroid.trainer.trainer import SugaroidTrainer
from sugaroid.core.statement import SugaroidStatement  # noqa:
from chatterbot.trainers import ChatterBotCorpusTrainer
from chatterbot.trainers import ListTrainer
from sugaroid.brain.utils import LanguageProcessor
import logging
import coloredlogs
import os
import sys
import warnings
from sugaroid.version import VERSION


    from sugaroid.tts.tts import Text2Speech

    AUD_DEPS = True
except ModuleNotFoundError:
    AUD_DEPS = False

sugaroid_logger = logging.getLogger(__name__)

# enable the verbosity
if "--debug" in sys.argv or os.getenv("SUGAROID_DEBUG") == "1":
    # set the verbosity
    print("SUGAROID_DEBUG: Enabled")
    verbosity = logging.INFO
    coloredlogs.install(level="INFO", logger=sugaroid_logger)

    from PyQt5 import QtCore, QtWidgets  # noqa:
    from PyQt5.QtWidgets import QApplication  # noqa:

    GUI_DEPS = True
except (ModuleNotFoundError, ImportError) as e:
    print("warn: Could not import PyQt5", e)
    GUI_DEPS = False

SPACY_LANG_PROCESSOR = LanguageProcessor()

[docs]class Sugaroid: """ Sugaroid Initates the chatbot class and connects logic Adapters together. Initates the ConfigManager to store sugaroid data and connects scans sys.argv """ def __init__( self, readonly: bool = True, should_copy_db: bool = True, db_name: str = "sugaroid.db", ): self.trainer = None self.corpusTrainer = None self.neuron = None = "audio" in sys.argv self.cfgmgr = ConfigManager() self.cfgpath = self.cfgmgr.get_cfgpath() self.database = SqlDatabaseManagement( os.path.join(self.cfgpath, "sugaroid_internal.db") ) self.database_exists = os.path.exists(os.path.join(self.cfgpath, db_name)) self.should_copy_db = should_copy_db self.db_name = db_name if not self.database_exists: self.corpus() self.commands = [ "sugaroid.brain.command.CommandAdapter", "sugaroid.brain.debug.DebugAdapter", "sugaroid.brain.interrupt.InterruptAdapter", "sugaroid.brain.learn.LearnAdapter", "sugaroid.brain.swaglyrics.SwagLyricsAdapter", "sugaroid.brain.yesno.BoolAdapter", "sugaroid.brain.aki.AkinatorAdapter", "sugaroid.brain.hangman.HangmanAdapter", "sugaroid.brain.either.OrAdapter", "sugaroid.brain.ok.OkayAdapter", "sugaroid.brain.time.TimeAdapter", "sugaroid.brain.bye.ByeAdapter", "sugaroid.brain.whoami.WhoAdapter", "", "sugaroid.brain.joke.JokeAdapter", "", ] self.adapters = [ # "sugaroid.brain.convert.CurrencyAdapter", "sugaroid.brain.trivia.TriviaAdapter", "sugaroid.brain.let.LetAdapter", "sugaroid.brain.whatwhat.WhatWhatAdapter", "sugaroid.brain.waitwhat.WaitWhatAdapter", "sugaroid.brain.assertive.AssertiveAdapter", "sugaroid.brain.canmay.CanAdapter", "sugaroid.brain.because.BecauseAdapter", "sugaroid.brain.have.HaveAdapter", "sugaroid.brain.rereversei.ReReverseAdapter", "sugaroid.brain.reversethink.ReverseAdapter", "sugaroid.brain.covid.CovidAdapter", "sugaroid.brain.myname.MyNameAdapter", "sugaroid.brain.iam.MeAdapter", "sugaroid.brain.about.AboutAdapter", "sugaroid.brain.wolfalpha.WolframAlphaAdapter", "", "sugaroid.brain.dolike.DoLikeAdapter", "sugaroid.brain.feel.FeelAdapter", "sugaroid.brain.areyou.AreYouAdapter", "sugaroid.brain.dolike.DoLikeAdapter", "", "sugaroid.brain.emotion.EmotionAdapter", "sugaroid.brain.dis.DisAdapter", "sugaroid.brain.twoword.TwoWordAdapter", "sugaroid.brain.oneword.OneWordAdapter", "sugaroid.brain.why.WhyWhenAdapter", "sugaroid.brain.reader.ReaderAdapter", "sugaroid.brain.imitate.ImitateAdapter", "", "chatterbot.logic.UnitConversion", ] if self.tts = Text2Speech() else: self.tts = None self.chatbot = SugaroidBot( "Sugaroid", storage_adapter="", logic_adapters=self.commands + [ { "import_path": "chatterbot.logic.BestMatch", "maximum_similarity_threshold": 0.80, } ] + self.adapters, database_uri=f"sqlite+pysqlite:///{self.cfgpath}/{self.db_name}", read_only=readonly, logger=sugaroid_logger, ) self.chatbot.globals["adapters"] = self.adapters self.invoke_brain() self.set_internet_media_from_environment()
[docs] def init_local_trainers(self): """ Init local trainers with minimum conversation :return: """ # initialize the trainer self.trainer = ListTrainer(self.chatbot) self.corpusTrainer = ChatterBotCorpusTrainer(self.chatbot)
[docs] def list_train(self, li): self.trainer.train(li)
[docs] def interrupt_ds(self): self.chatbot.interrupt = 2
[docs] def disable_interrupt_ds(self): self.chatbot.interrupt = 0
[docs] def toggle_discord(self): self.chatbot.discord = not self.chatbot.discord
[docs] def append_author(self, author): self.chatbot.authors.append(author)
[docs] def read(self): """ Train Sugaroid database from the sugaroid.trainer.json located in the configuration directory if it exists. If the sugaroid.db file exists, the Database update is skipped :return: """ if "train" in sys.argv: from sugaroid.trainer.trainer import main as trainer # FIXME replace with dynamic trainer i.e GUI + CLI trainer() else: if self.database_exists: print("Database already exists") pass else: if self.trainer is None: self.init_local_trainers() st = SugaroidTrainer() st.train(self.trainer) self.corpus() if "update" in sys.argv: if self.trainer is None: self.init_local_trainers() st = SugaroidTrainer() st.train(self.trainer)
[docs] def write(self): raise NotImplementedError
[docs] def corpus(self): """ Train data if it doesn't exists. Periodically update the data too :return: True when the process is complete """ db_dir = os.path.join(os.path.dirname(__file__), "data", "sugaroid.db") if self.should_copy_db: pass # shutil.copy(db_dir, os.path.join(self.cfgpath, self.db_name)) return True
[docs] def invoke_brain(self): """ Initiate the Brain :return: """ self.neuron = Neuron(self.chatbot)
[docs] def parse(self, args): """ Do a simple parsing of the init statement. Classify statement on the type of input_statement and confidence of each statement :param args: :return: """ if isinstance(args, str): preflight_time = time.time() response = self.neuron.parse(args) # only keep last 4 responses self.chatbot.globals["history"]["total"] = self.chatbot.globals["history"][ "total" ][-4:] + [response] self.chatbot.globals["history"]["user"] = self.chatbot.globals["history"][ "user" ][-4:] + [args] success_time = time.time() delta_time = success_time - preflight_time try: _text_response = response.text except AttributeError: _text_response = response assert isinstance(_text_response, str) if "gui" not in sys.argv: try: self.database.append( statement=_text_response, in_reponse_to=args, time=preflight_time, processing_time=delta_time, ) except Exception: # to protect our system, we sh pass return response else: raise ValueError("Invalid data type passed to Sugaroid.parse")
[docs] def prompt_cli(self): """ Classic prompt for Sugaroid Command Line Interface :return: """ response = self.parse(input(Fore.CYAN + "( ဖ‿ဖ) @> " + Fore.RESET)) return response
[docs] def display_cli(self, response): """ Classic display adapter for TTY Sugaroid Command Line Interface :param response: :return: """ try: print( Style.BRIGHT + Fore.GREEN + "sugaroid: " + Fore.RESET + Style.RESET_ALL + Fore.BLUE + emojize(response.text) + Fore.RESET + "\n" ) except AttributeError: print( Style.BRIGHT + Fore.GREEN + "sugaroid: " + Fore.RESET + Style.RESET_ALL + Fore.BLUE + emojize(response) + Fore.RESET + "\n" ) if self.tts.speak(str(response))
[docs] def loop_cli(self): """ Sugaroid loop_cli for Sugaroid Command Line Interface :return: """ try: while True: self.display_cli(self.prompt_cli()) except (KeyboardInterrupt, EOFError): self.database.close()
[docs] def loop_gui(self): """ Launch the sugaroid PyQt5 gui with Breeze theme and custom features If PyQt5 not installed, raise ModuleNotFoundError :return: """ if not GUI_DEPS: raise ModuleNotFoundError( "PyQt5 is not Installed. Install it by pip3 install PyQt5" ) print("Launching GUI") QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling, True) QApplication.setAttribute( QtCore.Qt.AA_UseHighDpiPixmaps, True ) # use highdpi icons app = QtWidgets.QApplication(sys.argv) app.setStyle("Breeze") from sugaroid.gui.ux import InterfaceSugaroidQt window = InterfaceSugaroidQt(parent=self) window.init() try: app.exec_() except KeyboardInterrupt: pass self.database.close()
[docs] def set_internet_media_from_environment(self): if os.getenv("DISCORD_TOKEN") or os.getenv("TELEGRAM_TOKEN"): # supports media self.chatbot.globals["media"] = True self.chatbot.globals["rich"] = True"Media support enabled")
[docs]def gui_main(): sg = Sugaroid() sg.loop_gui()
[docs]def main(): """ Launch a cli / gui based on the sys.argv Entrypoint in console_entry_points :return: """ print(SUGAROID_INTRO) colorama_init() read_only = False if "update" in sys.argv else True version = VERSION if not os.getenv("DYNO"): version = f"{version}.dev0 (local build)" print("Sugaroid {} RO:{}\n".format(version, read_only)) sg = Sugaroid(readonly=read_only) if "gui" in sys.argv: os.environ["SUGAROID"] = "GUI" sg.loop_gui() else: os.environ["SUGAROID"] = "CLI" sg.loop_cli()
if __name__ == "__main__": main()