aboutsummaryrefslogblamecommitdiffstats
path: root/slack/commands.py
blob: b4bb1b5a1f571018fb44378973433e7a0227112a (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                                  
             


                                 
                                                             


              
                                 
                                                                 
                               




                                                               
                                                
                                  
                                                    
                                              










                                                               








                                                                        










                                  


                                                                                
                                                                                 
                                                                    



                                            


                                                                       



                                                                                            
                                                 
 
                                                                                





                      







                                                                                       
                              






                                                           
                                                 





                                                                           
                  
                                                                                   





                      














                                                                   
                          

 
                  





                                                                   
                                       





                                                                   
                                       



                                                                   


                            


                                                                   



                                                                         
 
                                                  








                                                                  





                                                                                                               
                                                   


















                                                                                                                  
                                                   







                                                                                
                              











                                                                                                                        














                                                                   










                                                                             
                                           
                   











                                             
                       
                                      
               


                                                         



                                                  
                                    







                                                                                  



                                                                                      
                                                                                                





                                                                             
                                                                                              


                                                                            
                                                          
 


                                                                            
                         
                                                   




                                                                 
                                          
                                    
                                                                            

















                                                                                      
                                                                



                                                              


                                                                                        
                                                                   

                                                       
                                                      




                                                                                      
 








                                                                    

                                                              
                                                                           


                                                                   
                                                        
                                          
                                                                                     


                                         
                                                                



                                                                                



                                                                   
                                                              





                                                                                   

 
                                                                   



                                                                   





                                                                                 
 
                                             
                                         
                                                                    
 
                                                 
                                                                                   


                                                             


                                
                        
                             
                                                                     
     


















                                                                  






                                         
                                       


                                              
from __future__ import annotations

import pprint
import re
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, Dict, List, Optional, Tuple

import weechat

from slack.log import print_error
from slack.python_compatibility import removeprefix, removesuffix
from slack.shared import shared
from slack.slack_conversation import (
    SlackConversation,
    get_conversation_from_buffer_pointer,
)
from slack.slack_user import name_from_user_info_without_spaces
from slack.slack_workspace import SlackWorkspace
from slack.task import create_task
from slack.util import get_callback_name, with_color
from slack.weechat_config import WeeChatOption

commands: Dict[str, Command] = {}


# def parse_help_docstring(cmd):
#     doc = textwrap.dedent(cmd.__doc__).strip().split("\n", 1)
#     cmd_line = doc[0].split(None, 1)
#     args = "".join(cmd_line[1:])
#     return cmd_line[0], args, doc[1].strip()


def parse_options(args: str):
    regex = re.compile(" +-([^ =]+)(?:=([^ ]+))?")
    pos_args = regex.sub("", args)
    options: Dict[str, Optional[str]] = {
        match.group(1): match.group(2) for match in regex.finditer(args)
    }
    return pos_args, options


@dataclass
class Command:
    cmd: str
    top_level: bool
    description: str
    args: str
    args_description: str
    completion: str
    cb: Callable[[str, str], None]


def weechat_command(
    completion: str = "", min_args: int = 0, slack_buffer_required: bool = False
):
    def decorator(f: Callable[[str, List[str], Dict[str, Optional[str]]], None]):
        cmd = removeprefix(f.__name__, "command_").replace("_", " ")
        top_level = " " not in cmd

        @wraps(f)
        def wrapper(buffer: str, args: str):
            pos_args, options = parse_options(args)
            split_args = pos_args.split(" ", min_args)
            if min_args and not pos_args or len(split_args) < min_args:
                print_error(
                    f'Too few arguments for command "/{cmd}" (help on command: /help {cmd})'
                )
                return
            return f(buffer, split_args, options)

        commands[cmd] = Command(cmd, top_level, "", "", "", completion, wrapper)

        return wrapper

    return decorator


def list_workspaces(workspace_name: Optional[str] = None, detailed_list: bool = False):
    weechat.prnt("", "")
    weechat.prnt("", "All workspaces:")
    for workspace in shared.workspaces.values():
        display_workspace(workspace, detailed_list)


def display_workspace(workspace: SlackWorkspace, detailed_list: bool):
    if workspace.is_connected:
        weechat.prnt(
            "",
            f" * "
            f"{with_color('chat_server', workspace.name)} "
            f"{with_color('chat_delimiters', '[')}"
            f"connected"
            f"{with_color('chat_delimiters', ']')}"
            f", nick: {workspace.my_user.nick()}"
            f", 0 channel(s), 0 pv",
        )
    else:
        weechat.prnt("", f"   {with_color('chat_server', workspace.name)}")


@weechat_command()
def command_slack(buffer: str, args: List[str], options: Dict[str, Optional[str]]):
    """
    slack command
    """
    print("ran slack")


@weechat_command("%(slack_workspaces)")
def command_slack_connect(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    async def connect():
        if args and args[0]:
            workspace = shared.workspaces.get(args[0])
            if workspace:
                await workspace.connect()
            else:
                print_error(f'workspace "{args[0]}" not found')
        else:
            for workspace in shared.workspaces.values():
                await workspace.connect()

    create_task(connect())


@weechat_command()
def command_slack_workspace(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    list_workspaces()


@weechat_command("%(slack_workspaces)")
def command_slack_workspace_list(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    list_workspaces()


@weechat_command("%(slack_workspaces)")
def command_slack_workspace_listfull(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    list_workspaces(detailed_list=True)


@weechat_command(min_args=1)
def command_slack_workspace_add(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    name = args[0]
    if name in shared.workspaces:
        print_error(f'workspace "{name}" already exists, can\'t add it!')
        return

    shared.workspaces[name] = SlackWorkspace(name)

    for option_name, option_value in options.items():
        if hasattr(shared.workspaces[name].config, option_name):
            config_option: WeeChatOption[Any] = getattr(
                shared.workspaces[name].config, option_name
            )
            value = "on" if option_value is None else option_value
            config_option.value_set_as_str(value)

    weechat.prnt(
        "",
        f"{shared.SCRIPT_NAME}: workspace added: {weechat.color('chat_server')}{name}{weechat.color('reset')}",
    )


@weechat_command("%(slack_workspaces)", min_args=2)
def command_slack_workspace_rename(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    old_name = args[0]
    new_name = args[1]
    workspace = shared.workspaces.get(old_name)
    if not workspace:
        print_error(f'workspace "{old_name}" not found for "workspace rename" command')
        return
    workspace.name = new_name
    shared.workspaces[new_name] = workspace
    del shared.workspaces[old_name]
    weechat.prnt(
        "",
        f"server {with_color('chat_server', old_name)} has been renamed to {with_color('chat_server', new_name)}",
    )
    # TODO: Rename buffers and config


@weechat_command("%(slack_workspaces)", min_args=1)
def command_slack_workspace_del(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    name = args[0]
    workspace = shared.workspaces.get(name)
    if not workspace:
        print_error(f'workspace "{name}" not found for "workspace del" command')
        return
    if workspace.is_connected:
        print_error(
            f'you can not delete server "{name}" because you are connected to it. Try "/slack disconnect {name}" first.'
        )
        return
    # TODO: Delete config
    del shared.workspaces[name]
    weechat.prnt(
        "",
        f"server {with_color('chat_server', name)} has been deleted",
    )


@weechat_command("tasks|buffer")
def command_slack_debug(
    buffer: str, args: List[str], options: Dict[str, Optional[str]]
):
    if args[0] == "tasks":
        weechat.prnt("", "Active tasks:")
        weechat.prnt("", pprint.pformat(shared.active_tasks))
        weechat.prnt("", "Active futures:")
        weechat.prnt("", pprint.pformat(shared.active_futures))
    elif args[0] == "buffer":
        conversation = get_conversation_from_buffer_pointer(buffer)
        if conversation:
            weechat.prnt("", f"Conversation id: {conversation.id}")


def completion_slack_workspaces_cb(
    data: str, completion_item: str, buffer: str, completion: str
) -> int:
    for workspace_name in shared.workspaces:
        weechat.completion_list_add(
            completion, workspace_name, 0, weechat.WEECHAT_LIST_POS_SORT
        )
    return weechat.WEECHAT_RC_OK


def find_command(start_cmd: str, args: str) -> Optional[Tuple[Command, str]]:
    args_parts = re.finditer("[^ ]+", args)
    cmd = start_cmd
    cmd_args_startpos = 0

    for part in args_parts:
        next_cmd = f"{cmd} {part.group(0)}"
        if next_cmd not in commands:
            cmd_args_startpos = part.start(0)
            break
        cmd = next_cmd
    else:
        cmd_args_startpos = len(args)

    cmd_args = args[cmd_args_startpos:]
    if cmd in commands:
        return commands[cmd], cmd_args
    return None


def command_cb(data: str, buffer: str, args: str) -> int:
    found_cmd_with_args = find_command(data, args)
    if found_cmd_with_args:
        command = found_cmd_with_args[0]
        cmd_args = found_cmd_with_args[1]
        command.cb(buffer, cmd_args)
    else:
        print_error(
            f'Error with command "/{data} {args}" (help on command: /help {data})'
        )

    return weechat.WEECHAT_RC_OK


def completion_list_add(completion: str, word: str, nick_completion: int, where: str):
    if word == "%(slack_workspaces)":
        completion_slack_workspaces_cb("", "slack_workspaces", "", completion)
    else:
        # TODO: Consider WeeChat verison support, in < 2.9 one must use hook_completion_list_add
        weechat.completion_list_add(completion, word, nick_completion, where)


def completion_slack_workspace_commands_cb(
    data: str, completion_item: str, buffer: str, completion: str
) -> int:
    # TODO: Consider WeeChat verison support, in < 2.9 one must use hook_completion_get_string
    base_command = weechat.completion_get_string(completion, "base_command")
    base_word = weechat.completion_get_string(completion, "base_word")
    args = weechat.completion_get_string(completion, "args")
    args_without_base_word = removesuffix(args, base_word)

    found_cmd_with_args = find_command(base_command, args_without_base_word)
    if found_cmd_with_args:
        command = found_cmd_with_args[0]
        matching_cmds = [
            removeprefix(cmd, command.cmd).lstrip()
            for cmd in commands
            if cmd.startswith(command.cmd) and cmd != command.cmd
        ]
        if len(matching_cmds) > 1:
            for match in matching_cmds:
                cmd_arg = match.split(" ")
                completion_list_add(
                    completion, cmd_arg[0], 0, weechat.WEECHAT_LIST_POS_SORT
                )
        else:
            for arg in command.completion.split("|"):
                completion_list_add(completion, arg, 0, weechat.WEECHAT_LIST_POS_SORT)

    return weechat.WEECHAT_RC_OK


def completion_irc_channels_cb(
    data: str, completion_item: str, buffer: str, completion: str
) -> int:
    if weechat.buffer_get_string(buffer, "full_name").startswith("core."):
        weechat.completion_list_add(
            completion, "#asd", 0, weechat.WEECHAT_LIST_POS_SORT
        )
    return weechat.WEECHAT_RC_OK


def complete_input(conversation: SlackConversation, query: str):
    if (
        conversation.completion_context == "ACTIVE_COMPLETION"
        and conversation.completion_values
    ):
        input_value = weechat.buffer_get_string(conversation.buffer_pointer, "input")
        input_pos = weechat.buffer_get_integer(conversation.buffer_pointer, "input_pos")
        result = conversation.completion_values[conversation.completion_index]
        input_before = removesuffix(input_value[:input_pos], query)
        input_after = input_value[input_pos:]
        new_input = input_before + result + input_after
        new_pos = input_pos - len(query) + len(result)

        with conversation.completing():
            weechat.buffer_set(conversation.buffer_pointer, "input", new_input)
            weechat.buffer_set(conversation.buffer_pointer, "input_pos", str(new_pos))


def nick_suffix():
    return weechat.config_string(
        weechat.config_get("weechat.completion.nick_completer")
    )


async def complete_user_next(
    conversation: SlackConversation, query: str, is_first_word: bool
):
    if conversation.completion_context == "NO_COMPLETION":
        conversation.completion_context = "PENDING_COMPLETION"
        search = await conversation.workspace.api.fetch_users_search(query)
        if conversation.completion_context != "PENDING_COMPLETION":
            return
        conversation.completion_context = "ACTIVE_COMPLETION"
        suffix = nick_suffix() if is_first_word else " "
        conversation.completion_values = [
            name_from_user_info_without_spaces(conversation.workspace, user) + suffix
            for user in search["results"]
        ]
        conversation.completion_index = 0
    elif conversation.completion_context == "ACTIVE_COMPLETION":
        conversation.completion_index += 1
        if conversation.completion_index >= len(conversation.completion_values):
            conversation.completion_index = 0

    complete_input(conversation, query)


def complete_previous(conversation: SlackConversation, query: str):
    if conversation.completion_context == "ACTIVE_COMPLETION":
        conversation.completion_index -= 1
        if conversation.completion_index < 0:
            conversation.completion_index = len(conversation.completion_values) - 1
        complete_input(conversation, query)
        return weechat.WEECHAT_RC_OK_EAT
    return weechat.WEECHAT_RC_OK


def input_complete_cb(data: str, buffer: str, command: str) -> int:
    conversation = get_conversation_from_buffer_pointer(buffer)
    if conversation:
        input_value = weechat.buffer_get_string(buffer, "input")
        input_pos = weechat.buffer_get_integer(buffer, "input_pos")
        input_before_cursor = input_value[:input_pos]

        word_index = (
            -2 if conversation.completion_context == "ACTIVE_COMPLETION" else -1
        )
        word_until_cursor = " ".join(input_before_cursor.split(" ")[word_index:])

        if word_until_cursor.startswith("@"):
            query = word_until_cursor[1:]
            is_first_word = word_until_cursor == input_before_cursor

            if command == "/input complete_next":
                create_task(complete_user_next(conversation, query, is_first_word))
                return weechat.WEECHAT_RC_OK_EAT
            else:
                return complete_previous(conversation, query)
    return weechat.WEECHAT_RC_OK


def register_commands():
    weechat.hook_command_run(
        "/input complete_*", get_callback_name(input_complete_cb), ""
    )
    weechat.hook_completion(
        "slack_workspaces",
        "Slack workspaces (internal names)",
        get_callback_name(completion_slack_workspaces_cb),
        "",
    )
    weechat.hook_completion(
        "slack_commands",
        "completions for Slack commands",
        get_callback_name(completion_slack_workspace_commands_cb),
        "",
    )
    weechat.hook_completion(
        "irc_channels",
        "channels on all Slack workspaces",
        get_callback_name(completion_irc_channels_cb),
        "",
    )

    for cmd, command in commands.items():
        if command.top_level:
            weechat.hook_command(
                command.cmd,
                command.description,
                command.args,
                command.args_description,
                "%(slack_commands)|%*",
                get_callback_name(command_cb),
                cmd,
            )