import shutil
import time
from colorama import Fore, Style
from colorama import init as colorama_init
from emoji import emojize
from sugaroid.backend.sql import SqlDatabaseManagement
from sugaroid.brain.constants import SUGAROID_INTRO
from sugaroid.config.config import ConfigManager
from sugaroid.brain.brain import Neuron
from sugaroid.core.bot import SugaroidBot
from sugaroid.trainer.trainer import SugaroidTrainer
from sugaroid.core.statement import SugaroidStatement # noqa:
from chatterbot.trainers import ChatterBotCorpusTrainer
from chatterbot.trainers import ListTrainer
from sugaroid.brain.utils import LanguageProcessor
import logging
import coloredlogs
import os
import sys
import warnings
from sugaroid.version import VERSION
warnings.filterwarnings("ignore")
try:
from sugaroid.tts.tts import Text2Speech
AUD_DEPS = True
except ModuleNotFoundError:
AUD_DEPS = False
sugaroid_logger = logging.getLogger(__name__)
# enable the verbosity
if "--debug" in sys.argv or os.getenv("SUGAROID_DEBUG") == "1":
# set the verbosity
print("SUGAROID_DEBUG: Enabled")
verbosity = logging.INFO
logging.basicConfig(level=verbosity)
sugaroid_logger.setLevel(level=logging.INFO)
coloredlogs.install(level="INFO", logger=sugaroid_logger)
try:
from PyQt5 import QtCore, QtWidgets # noqa:
from PyQt5.QtWidgets import QApplication # noqa:
GUI_DEPS = True
except (ModuleNotFoundError, ImportError) as e:
print("warn: Could not import PyQt5", e)
GUI_DEPS = False
SPACY_LANG_PROCESSOR = LanguageProcessor()
[docs]class Sugaroid:
"""
Sugaroid Initates the chatbot class and connects logic Adapters together.
Initates the ConfigManager to store sugaroid data and connects scans
sys.argv
"""
def __init__(
self,
readonly: bool = True,
should_copy_db: bool = True,
db_name: str = "sugaroid.db",
):
self.trainer = None
self.corpusTrainer = None
self.neuron = None
self.audio = "audio" in sys.argv
self.cfgmgr = ConfigManager()
self.cfgpath = self.cfgmgr.get_cfgpath()
self.database = SqlDatabaseManagement(
os.path.join(self.cfgpath, "sugaroid_internal.db")
)
self.database_exists = os.path.exists(os.path.join(self.cfgpath, db_name))
self.should_copy_db = should_copy_db
self.db_name = db_name
if not self.database_exists:
self.corpus()
self.commands = [
"sugaroid.brain.command.CommandAdapter",
"sugaroid.brain.debug.DebugAdapter",
"sugaroid.brain.interrupt.InterruptAdapter",
"sugaroid.brain.learn.LearnAdapter",
"sugaroid.brain.swaglyrics.SwagLyricsAdapter",
"sugaroid.brain.yesno.BoolAdapter",
"sugaroid.brain.aki.AkinatorAdapter",
"sugaroid.brain.hangman.HangmanAdapter",
"sugaroid.brain.either.OrAdapter",
"sugaroid.brain.ok.OkayAdapter",
"sugaroid.brain.time.TimeAdapter",
"sugaroid.brain.bye.ByeAdapter",
"sugaroid.brain.whoami.WhoAdapter",
"sugaroid.brain.news.NewsAdapter",
"sugaroid.brain.joke.JokeAdapter",
"sugaroid.brain.play.PlayAdapter",
]
self.adapters = [
# "sugaroid.brain.convert.CurrencyAdapter",
"sugaroid.brain.trivia.TriviaAdapter",
"sugaroid.brain.let.LetAdapter",
"sugaroid.brain.whatwhat.WhatWhatAdapter",
"sugaroid.brain.waitwhat.WaitWhatAdapter",
"sugaroid.brain.assertive.AssertiveAdapter",
"sugaroid.brain.canmay.CanAdapter",
"sugaroid.brain.because.BecauseAdapter",
"sugaroid.brain.have.HaveAdapter",
"sugaroid.brain.rereversei.ReReverseAdapter",
"sugaroid.brain.reversethink.ReverseAdapter",
"sugaroid.brain.covid.CovidAdapter",
"sugaroid.brain.myname.MyNameAdapter",
"sugaroid.brain.iam.MeAdapter",
"sugaroid.brain.about.AboutAdapter",
"sugaroid.brain.wolfalpha.WolframAlphaAdapter",
"sugaroid.brain.wiki.WikiAdapter",
"sugaroid.brain.dolike.DoLikeAdapter",
"sugaroid.brain.feel.FeelAdapter",
"sugaroid.brain.areyou.AreYouAdapter",
"sugaroid.brain.dolike.DoLikeAdapter",
"sugaroid.brain.do.DoAdapter",
"sugaroid.brain.emotion.EmotionAdapter",
"sugaroid.brain.dis.DisAdapter",
"sugaroid.brain.twoword.TwoWordAdapter",
"sugaroid.brain.oneword.OneWordAdapter",
"sugaroid.brain.why.WhyWhenAdapter",
"sugaroid.brain.reader.ReaderAdapter",
"sugaroid.brain.imitate.ImitateAdapter",
"sugaroid.brain.fun.FunAdapter",
"chatterbot.logic.UnitConversion",
]
if self.audio:
self.tts = Text2Speech()
else:
self.tts = None
self.chatbot = SugaroidBot(
"Sugaroid",
storage_adapter="chatterbot.storage.SQLStorageAdapter",
logic_adapters=self.commands
+ [
{
"import_path": "chatterbot.logic.BestMatch",
"maximum_similarity_threshold": 0.80,
}
]
+ self.adapters,
database_uri=f"sqlite+pysqlite:///{self.cfgpath}/{self.db_name}",
read_only=readonly,
logger=sugaroid_logger,
)
self.chatbot.globals["adapters"] = self.adapters
self.read()
self.invoke_brain()
self.set_internet_media_from_environment()
[docs] def init_local_trainers(self):
"""
Init local trainers with minimum conversation
:return:
"""
# initialize the trainer
self.trainer = ListTrainer(self.chatbot)
self.corpusTrainer = ChatterBotCorpusTrainer(self.chatbot)
[docs] def list_train(self, li):
self.trainer.train(li)
[docs] def interrupt_ds(self):
self.chatbot.interrupt = 2
[docs] def disable_interrupt_ds(self):
self.chatbot.interrupt = 0
[docs] def toggle_discord(self):
self.chatbot.discord = not self.chatbot.discord
[docs] def append_author(self, author):
self.chatbot.authors.append(author)
[docs] def read(self):
"""
Train Sugaroid database from the sugaroid.trainer.json located in the configuration directory
if it exists. If the sugaroid.db file exists, the Database update is skipped
:return:
"""
if "train" in sys.argv:
from sugaroid.trainer.trainer import main as trainer
# FIXME replace with dynamic trainer i.e GUI + CLI
trainer()
else:
if self.database_exists:
print("Database already exists")
pass
else:
if self.trainer is None:
self.init_local_trainers()
st = SugaroidTrainer()
st.train(self.trainer)
self.corpus()
if "update" in sys.argv:
if self.trainer is None:
self.init_local_trainers()
st = SugaroidTrainer()
st.train(self.trainer)
[docs] def write(self):
raise NotImplementedError
[docs] def corpus(self):
"""
Train data if it doesn't exists.
Periodically update the data too
:return: True when the process is complete
"""
db_dir = os.path.join(os.path.dirname(__file__), "data", "sugaroid.db")
if self.should_copy_db:
pass # shutil.copy(db_dir, os.path.join(self.cfgpath, self.db_name))
return True
[docs] def invoke_brain(self):
"""
Initiate the Brain
:return:
"""
self.neuron = Neuron(self.chatbot)
[docs] def parse(self, args):
"""
Do a simple parsing of the init statement. Classify statement on the
type of input_statement
and confidence of each statement
:param args:
:return:
"""
if isinstance(args, str):
preflight_time = time.time()
response = self.neuron.parse(args)
# only keep last 4 responses
self.chatbot.globals["history"]["total"] = self.chatbot.globals["history"][
"total"
][-4:] + [response]
self.chatbot.globals["history"]["user"] = self.chatbot.globals["history"][
"user"
][-4:] + [args]
success_time = time.time()
delta_time = success_time - preflight_time
try:
_text_response = response.text
except AttributeError:
_text_response = response
assert isinstance(_text_response, str)
if "gui" not in sys.argv:
try:
self.database.append(
statement=_text_response,
in_reponse_to=args,
time=preflight_time,
processing_time=delta_time,
)
except Exception:
# to protect our system, we sh
pass
return response
else:
raise ValueError("Invalid data type passed to Sugaroid.parse")
[docs] def prompt_cli(self):
"""
Classic prompt for Sugaroid Command Line Interface
:return:
"""
response = self.parse(input(Fore.CYAN + "( ဖ‿ဖ) @> " + Fore.RESET))
return response
[docs] def display_cli(self, response):
"""
Classic display adapter for TTY Sugaroid Command Line Interface
:param response:
:return:
"""
try:
print(
Style.BRIGHT
+ Fore.GREEN
+ "sugaroid: "
+ Fore.RESET
+ Style.RESET_ALL
+ Fore.BLUE
+ emojize(response.text)
+ Fore.RESET
+ "\n"
)
except AttributeError:
print(
Style.BRIGHT
+ Fore.GREEN
+ "sugaroid: "
+ Fore.RESET
+ Style.RESET_ALL
+ Fore.BLUE
+ emojize(response)
+ Fore.RESET
+ "\n"
)
if self.audio:
self.tts.speak(str(response))
[docs] def loop_cli(self):
"""
Sugaroid loop_cli for Sugaroid Command Line Interface
:return:
"""
try:
while True:
self.display_cli(self.prompt_cli())
except (KeyboardInterrupt, EOFError):
self.database.close()
[docs] def loop_gui(self):
"""
Launch the sugaroid PyQt5 gui with Breeze theme and custom features
If PyQt5 not installed, raise ModuleNotFoundError
:return:
"""
if not GUI_DEPS:
raise ModuleNotFoundError(
"PyQt5 is not Installed. Install it by pip3 install PyQt5"
)
print("Launching GUI")
QApplication.setAttribute(QtCore.Qt.AA_EnableHighDpiScaling, True)
QApplication.setAttribute(
QtCore.Qt.AA_UseHighDpiPixmaps, True
) # use highdpi icons
app = QtWidgets.QApplication(sys.argv)
app.setStyle("Breeze")
from sugaroid.gui.ux import InterfaceSugaroidQt
window = InterfaceSugaroidQt(parent=self)
window.init()
try:
app.exec_()
except KeyboardInterrupt:
pass
self.database.close()
[docs]def gui_main():
sg = Sugaroid()
sg.loop_gui()
[docs]def main():
"""
Launch a cli / gui based on the sys.argv
Entrypoint in setup.py console_entry_points
:return:
"""
print(SUGAROID_INTRO)
colorama_init()
read_only = False if "update" in sys.argv else True
version = VERSION
if not os.getenv("DYNO"):
version = f"{version}.dev0 (local build)"
print("Sugaroid {} RO:{}\n".format(version, read_only))
sg = Sugaroid(readonly=read_only)
if "gui" in sys.argv:
os.environ["SUGAROID"] = "GUI"
sg.loop_gui()
else:
os.environ["SUGAROID"] = "CLI"
sg.loop_cli()
if __name__ == "__main__":
main()