Enhancing our code

We also want to give our users a good experience so we are going to add a command-line parser so the users of our application can specify some parameters before starting the voting process. There will be only one argument that we are going to implement and that is --hashtags, where users can pass a space-separated list of hashtags.

With that said, we are going to define some rules for these arguments. First, we will limit the maximum number of hashtags that we are going to monitor, so we are going to add a rule that no more than four hashtags can be used.

If the user specifies more than four hashtags, we will simply display a warning on the Terminal and pick the first four hashtags. We also want to remove the duplicated hashtags.

When showing these warning messages that we talked about, we could simply print them on the Terminal and it would definitely work; however, we want to make things more interesting, so we are going to use the logging package to do it. Apart from that, implementing a proper logging will give us much more control over what kind of log we want to have and also how we want to present it to the users.

Before we start implementing the command-line parser, let's add the logger. Create a file called app_logger.py in the twittervotes/core directory with the following content:

import os
import logging
from logging.config import fileConfig


def get_logger():
core_dir = os.path.dirname(os.path.abspath(__file__))
file_path = os.path.join(core_dir, '..', 'logconfig.ini')
fileConfig(file_path)
return logging.getLogger('twitterVotesLogger')

This function doesn't do much but first we import the os module, then we import the logging package, and lastly, we import the function fileConfig, which reads the logging configuration from a config file. This configuration file has to be in the configparser format and you can get more information about this format at https://docs.python.org/3.6/library/logging.config.html#logging-config-fileformat.

After we read the configuration file, we just return a logger called twitterVotesLogger.

Let's see what the configuration file for our application looks like. Create a file called logconfig.ini in the twittervotes directory with the following content:

[loggers]
keys=root,twitterVotesLogger

[handlers]
keys=consoleHandler

[formatters]
keys=simpleFormatter

[logger_root]
level=INFO
handlers=consoleHandler

[logger_twitterVotesLogger]
level=INFO
handlers=consoleHandler
qualname=twitterVotesLogger


[handler_consoleHandler]
class=StreamHandler
level=INFO
formatter=simpleFormatter
args=(sys.stdout,)

[formatter_simpleFormatter]
format=[%(levelname)s] %(asctime)s - %(message)s
datefmt=%Y-%m-%d %H:%M:%S

So here we define two loggers, root and twitterVotesLogger; the loggers are responsible for exposing methods that we can use at runtime to log messages. It is also through the loggers that we can set the level of severity, for example, INFO, DEBUG and so on. Lastly, the logger passes the log messages along to the appropriated handler.

In the definition of our twitterVotesLogger, we set the level of severity to INFO, we set the handler to consoleHandler (we are going to describe this very soon), and we also set a qualified name that will be used when we want to get hold of the twitterVotesLogger

The last option for twitterVotesLoggers is propagate. Since the twitterVotesLogger is a child logger, we don't want the log message sent through the twittersVotesLogger to propagate to its ancestors. Without propagate set to 0, every log message would be shown twice since the twitterVotesLogger's ancestor is the root logger.

The next component in the logging configuration is the handler. Handlers are the component that sends the log messages of a specific logger to a destination. We defined a handler called consoleHandler of type StreamHandler, which is a built-in handler of the logging module. The StreamHandler sends out log messages to streams such as sys.stdout, sys.stderr, or a file. This is perfect for us because we want to send messages to the Terminal.

In the consoleHandler, we also set the severity level to INFO and also we set the formatter which is set to the customFormatter; then we set the value for args to (sys.stdout, ). Args specify where the log messages will be sent to; in this case, we set only sys.stdout but you can add multiple output streams if you need.

The last component of this configuration is the formatter customFormatter. Formatters simply define how the log message should be displayed. In our customFormatter, we just define how the message should be displayed and show the date format.

Now that we have the logging in place, let's add the functions that will parse the command line. Create a file cmdline_parser.py in twittervotes/core and add some imports:

from argparse import ArgumentParser

from .app_logger import get_logger

Then we will need to add a function that will validate the command-line arguments:

def validated_args(args):

logger = get_logger()

unique_hashtags = list(set(args.hashtags))

if len(unique_hashtags) < len(args.hashtags):
logger.info(('Some hashtags passed as arguments were '
'duplicated and are going to be ignored'))

args.hashtags = unique_hashtags

if len(args.hashtags) > 4:
logger.error('Voting app accepts only 4 hashtags at the
time')
args.hashtags = args.hashtags[:4]

return args

validate_args functions have only one parameter and it is the arguments that have been parsed by the ArgumentParser. The first thing we do in this function is to get hold of the logger that we just created, so we can send log messages to inform the user about possible problems in the command-line arguments that have been passed to the application.

Next, we transform the list of hashtags into a set so all the duplicated hashtags are removed and then we transform it back to a list. After that, we check whether the number of unique hashtags is less than the original number of hashtags that have been passed on the command line. That means that we had duplication and we log a message to inform the user about that.

The last verification we do is to make sure that a maximum of four hashtags will be monitored by our application. If the number of items in the hashtag list is greater than four, then we slice the array, getting only the first four items, and we also log a message to inform the user that only four hashtags will be displayed.

Let's add another function, parse_commandline_args:

def parse_commandline_args():
argparser = ArgumentParser(
prog='twittervoting',
description='Collect votes using twitter hashtags.')

required = argparser.add_argument_group('require arguments')

required.add_argument(
'-ht', '--hashtags',
nargs='+',
required=True,
dest='hashtags',
help=('Space separated list specifying the '
'hashtags that will be used for the voting.\n'
'Type the hashtags without the hash symbol.'))

args = argparser.parse_args()

return validated_args(args)

We saw how the ArgumentParser works when we were developing the application in the first chapter, the weather application. However, we can still go through what this function does.

First, we define an ArgumentParser object, defining a name and a description, and we create a subgroup called required that, as the name suggests, will have all the required fields.

Note that we don't really need to create this extra group; however, I find that it helps to keep the code more organized and easier to maintain in case it is necessary to add new options in the future.

We define only one argument, hashtags. In the definition of the hashtags argument, there is an argument called nargs and we have set it to +; this means that I can pass an unlimited number of items separated by spaces, as follows:

--hashtags item1 item2 item3

The last thing we do in this function is to parse the arguments with the parse_args function and run the arguments through the validate_args function that has been shown previously.

Let's import the parse_commandline_args function in the __init__.py file in the twittervotes/core directory:

from .cmdline_parser import parse_commandline_args

Now we need to create a class that will help us to manage hashtags and perform tasks such as keeping the score count of hashtags, updating its value after every request. So let's go ahead and create a class called HashtagStatsManager. Create a file called hashtagstats_manager.py in twittervotes/core/twitter with the following content:

from .hashtag import Hashtag


class HashtagStatsManager:

def __init__(self, hashtags):

if not hashtags:
raise AttributeError('hashtags must be provided')

self._hashtags = {hashtag: Hashtag(hashtag) for hashtag in
hashtags}

def update(self, data):

hashtag, results = data

metadata = results.get('search_metadata')
refresh_url = metadata.get('refresh_url')
statuses = results.get('statuses')

total = len(statuses)

if total > 0:
self._hashtags.get(hashtag.name).total += total
self._hashtags.get(hashtag.name).refresh_url =
refresh_url


@property
def hashtags(self):
return self._hashtags

This class is also very simple: in the constructor, we get a list of hashtags and initialize a property, _hashtags, which will be a dictionary where the key is the name of the hashtag and the value is an instance of the Hashtag class.

The update method gets a tuple containing a Hashtag object and the results are returned by the Twitter API.  First, we unpack the tuple values and set it to the hashtag and results variables. The results dictionary has two items that are interesting to us. The first is the search_metadata; in this item, we will find the refresh_url and the statuses contain a list of all tweets that used the hashtag that we were searching for.

So we get the values for the search_metadata, the refresh_url, and lastly the statuses. Then we count how many items there are in the statuses list. If the number of items on the statuses list is greater than 0, we update the total count for the underlying hashtag as well as its refresh_url.

Then we import the HashtagStatsManager class that we just created in the __init__.py file in the twittervotes/core/twitter directory:

from .hashtagstats_manager import HashtagStatsManager

The heart of this application is the class Runner. This class will perform the execution of a function and queue it in the process pool. Every function will be executed in parallel in a different process, which will make the program much faster than if I executed these functions one by one.

Let's have a look at how the Runner class is implemented:

import concurrent.futures

from rx import Observable


class Runner:

def __init__(self, on_success, on_error, on_complete):
self._on_success = on_success
self._on_error = on_error
self._on_complete = on_complete

def exec(self, func, items):

observables = []

with concurrent.futures.ProcessPoolExecutor() as executor:
for item in items.values():
_future = executor.submit(func, item)
observables.append(Observable.from_future(_future))

all_observables = Observable.merge(observables)

all_observables.subscribe(self._on_success,
self._on_error,
self._on_complete)

The class Runner has an initializer taking three arguments; they are all functions that will be called in different statuses of the execution. on_success will be called when the execution of the item has been successful, on_error when the execution of one function has failed for some reason, and finally on_complete will be called when all the functions in the queue have been executed.

There is also a method called exec that takes a function as the first argument, which is the function that will be executed, and the second argument is a list of Hashtag instances.

There are a few interesting things in the Runner class. First, we are using the concurrent.futures module, which is a really nice addition to Python and has been around since Python 3.2; this module provides ways of executing callables asynchronously. 

The concurrent.futures module also provides the ThreadPoolExecutor, which will perform asynchronous executions using threads, and the ProcessPollExecutor, which uses a process. You can easily switch between these execution strategies according to your needs.

The rule of thumb is if your function is CPU-bound, it is a good idea to use ProcessPollExecutor, otherwise, you will suffer big performances issues because of the Python Global Interpreter Lock (GIL). For I/O-bound operations, I prefer using ThreadPoolExecutor.

If you want to read more about the GIL, you can check out the following wiki page: https://wiki.python.org/moin/GlobalInterpreterLock.

Since we are not doing any I/O-bound operations, we use ProcessPoolExecutor. Then, we loop through the values of the items, which is a dictionary containing all the hashtags that our application is monitoring. And for every hashtag, we pass it to the submit function of the ProcessPollExecutor along with the function that we want to execute; in our case, it will be the execute_request function defined in the core module of our application.

The submit function, instead of returning the value returned by the execute_request function, will return a future object, which encapsulates the asynchronous execution of the execute_request function. The future object provides methods to cancel an execution, check the status of the execution, get the results of the execution, and so on.

Now, we want a way to be notified when the executions change state or when they finish. That is where reactive programming comes in handy.

Here, we get the future object and create an Observable. Observables are the core of reactive programming. An Observable is an object that can be observed and emit events at any given time. When an Observable emits an event, all observers that subscribed to that Observable will be notified and react to those changes.

This is exactly what we are trying to achieve here: we have an array of future executions and we want to be notified when those executions change state. These states will be handled by the functions that we passed as an argument to the Runner initializer—_on_sucess, _on_error, and _on_complete.

Perfect! Let's import the Runner class into __init__.py in the twittervotes/core directory:

from .runner import Runner

The last piece of our project is to add the entry point of our application. We are going to add the user interface using the Tkinter package from the standard library. So let's start implementing it. Create a file called app.py in the twittervotes directory, and let's start by adding some imports:

from core import parse_commandline_args
from core import execute_request
from core import Runner

from core.twitter import HashtagStatsManager

from tkinter import Tk
from tkinter import Frame
from tkinter import Label
from tkinter import StringVar
from tkinter.ttk import Button

Here, we import the command-line argument parser that we created, execute_request to perform the requests to the Twitter API, and also the Runner class that will help us execute the requests to the Twitter API  in parallel.

We also import the HashtagStatsManager to manage the hashtag voting results for us.

Lastly, we have all the imports related to tkinter.

In the same file, let's create a class called Application as follows:

class Application(Frame):

def __init__(self, hashtags=[], master=None):
super().__init__(master)

self._manager = HashtagStatsManager(hashtags)

self._runner = Runner(self._on_success,
self._on_error,
self._on_complete)

self._items = {hashtag: StringVar() for hashtag in hashtags}
self.set_header()
self.create_labels()
self.pack()

self.button = Button(self, style='start.TButton',
text='Update',
command=self._fetch_data)
self.button.pack(side="bottom")

So here, we create a class, Application, that inherits from Frame. The initializer takes two arguments: hashtags, which are the hashtags that we are going to monitor, and the master argument, which is an object of type Tk.

Then we create an instance of HashtagStatsManager, passing the list of hashtags; we also create an instance of the Runner class passing three arguments. These arguments are functions that will be called when one execution finishes successfully, when the execution fails, and when all the executions are complete.

Then we have a dictionary comprehension that will create a dictionary where the keys are the hashtags and the values are a Tkinter variable of type string, which in the Tkinter world is called StringVar. We do that so it will be easier to update the labels with the results later on.

We call the methods set_header and create_labels that we are going to implement shortly and finally we call pack. The pack function will organize widgets such as buttons and labels and place them in the parent widget, in this case, the Application.

Then we define a button that will execute the function _fetch_data when clicked and we use pack to place the button at the bottom of the frame:

def set_header(self):
title = Label(self,
text='Voting for hasthags',
font=("Helvetica", 24),
height=4)
title.pack()

Here's the set_header method that I mentioned earlier; it simply creates Label objects and places them at the top of the frame.

Now we can add the create_labels method:

def create_labels(self):
for key, value in self._items.items():
label = Label(self,
textvariable=value,
font=("Helvetica", 20), height=3)
label.pack()
self._items[key].set(f'#{key}\nNumber of votes: 0')

The create_labels method loops through self._items, which, if you remember, is a dictionary where the key is the name of the hashtag and the value is a Tkinter variable of type string.

First, we create a Label, and the interesting part is the textvariable argument; we set it to value, which is a Tkinter variable related to a specific hashtag. Then we place the Label in the frame and, lastly, we set the value of the label using the function set.

Then we need to add a method that will update the Labels for us:

def _update_label(self, data):
hashtag, result = data

total = self._manager.hashtags.get(hashtag.name).total

self._items[hashtag.name].set(
f'#{hashtag.name}\nNumber of votes: {total}')

The _update_label, as the name suggests, updates the label of a specific hashtag. The data argument is the results returned by the Twitter API and we get the total number of the hashtags from the manager. Finally, we use the set function again to update the label.

Let's add another function that will actually do the work of sending the requests to the Twitter API:

def _fetch_data(self):
self._runner.exec(execute_request,
self._manager.hashtags)

This method will call the method exec of the Runner to execute the function that performs the requests to the Twitter API.

Then we need to define the methods that will handle the events emitted by the Observables created in the Runner class; we start by adding the method that will handle execution errors:

def _on_error(self, error_message):
raise Exception(error_message)

This is a helper method just to raise an exception in case something goes wrong with the execution of the requests.

Then we add another method that handles when the execution of an Observable has been successful:

def _on_success(self, data):
hashtag, _ = data
self._manager.update(data)
self._update_label(data)

The _on_success method is going to be called when one execution from the Runner finished successfully, and it will just update the manager with the new data and also update the label in the UI.

Lastly, we define a method that will handle when all the executions have been completed:

def _on_complete(self):
pass

The _on_complete will be called when all the executions of the Runner finish. We are not going to be using it so we just use the pass statement.

Now it is time to implement the function that will set up the application and initialize the UI—the function start_app:

def start_app(args):
root = Tk()

app = Application(hashtags=args.hashtags, master=root)
app.master.title("Twitter votes")
app.master.geometry("400x700+100+100")
app.mainloop()

This function creates the root application, sets the title, defines its dimensions, and also calls the mainloop function so the application keeps running.

The last piece is to define the main function:

def main():
args = parse_commandline_args()
start_app(args)


if __name__ == '__main__':
main()

The main function is pretty simple. First, we parse the command-line arguments, then we start the application, passing the command-line arguments to it.

Let's see the application in action! Run the following command:

python app.py --help

You will see the following output:

Let's say we want the voting process to run for 3 minutes and it will monitor the hashtags #debian, #ubuntu, and #arch:

python app.py --hashtags debian ubuntu arch

Then you should see the following UI:

And if you click the Update button, the count for every hashtag will be updated.