
Remote task executor
Using Django channels for real time task submitting and monitor.
Introduction
Channels is a Django application that is using websockets technology for real time client - server communication in accordance with the basic http request/response cycle. The day of writing, channels is stable enough to be a part of the basic Django installation. Developers, using channels are able to create web applications that send/receive real time odjects with the server. That enables users to integrate with the application beyond http requests. Use cases are chat rooms, desktop notifications etc.
With this article we hope to inspire you about some new capabilities for your projects. The code is straight forward and easy to begin with.
Unfortunately is going to be relatively long so lets start with a screenshot:
In this demo, an operator submit tasks running in parallel. Each one has its own console tab. There is also an alert area (to the right) that informs the 'operators' group about the start/end of any task.
For a task submit you need to fill in the fields:
- Server
- Job Type
- Job Name
- Job Body
Requirements
Imagine a team of operators who submit tasks on multiple remote linux servers.
- An operator can choose a server, a job type (like sql, bash, ansible playbook, JCL, anything you can describe as a Jinja2 template of a shell script), write his task and submit.
- The system must know how to run each type of task from a linux shell.
- Every operator can submit multiple tasks from the same web page.
- Every operator can monitor the execution of each one of his tasks via systemout/systemerr output.
- The whole team must be informed real time when a task is starting.
- The whole team must be informed real time when a task is complete and for the exit code (success or failure).
Dependencies
We begin with a linux machine with python3 and python virtual environments installed.
Channels need a message broker like Redis so:
ubuntu@ubuntuDev:~# sudo apt-get install redis-server
Create a new virtual environment, activate it and install python packages:
ubuntu@ubuntuDev:~$ virtualenv -p python3 operations_env
ubuntu@ubuntuDev:~$ source operations_env/bin/activate
(operations_env) ubuntu@ubuntuDev:~$ pip install django channels asgi_redis
We also need paramiko and jinja2 for ssh connections and for templating our tasks:
(operations_env) ubuntu@ubuntuDev:~$ pip install paramiko jinja2
For the front end we use the following javascript libraries:
- jquery-3.1.1
- bootstrap-3.3.7
- reconnecting-websocket-1.0.0 (for keeping the web socket connection always alive)
- jquery.serialize-object-2.5.0
The Application
Start a new project and a new app:
(operations_env) ubuntu@ubuntuDev:~$ cd operations_env
(operations_env) ubuntu@ubuntuDev:~/operations_env$ django-admin startproject operations
(operations_env) ubuntu@ubuntuDev:~/operations_env$ cd operations/
(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py startapp piston
Prepare the database (we use the default sqlite3):
(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py migrate
Create the administrator:
(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py createsuperuser
Now create two models, one for the servers and one for out job-types. For this example we will save our credentials on the server model (in production there is ldap directory):
~/operations_env/operations/piston/models.py
from django.db import models class JobType(models.Model): name = models.CharField(max_length=50) date_created = models.DateTimeField( auto_now_add=True ) date_modified = models.DateTimeField( auto_now=True ) description = models.TextField( blank=True, max_length=1024 ) template = models.TextField( max_length=1024 ) class Server(models.Model): name = models.CharField(max_length=50) hostname = models.CharField(max_length=50) username = models.CharField(max_length=50) password = models.CharField(max_length=50)
We also need to register our models to the admin application:
~/operations_env/operations/piston/admin.py
from django.contrib import admin from .models import JobType, Server @admin.register(JobType) class JobTypeAdmin(admin.ModelAdmin): list_display = ('name',) pass @admin.register(Server) class JobTypeAdmin(admin.ModelAdmin): list_display = ('name',) pass
Add to your settings.py your new application:
~/operations_env/operations/operations/settings.py
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'piston', ]
You are now ready to fire up the development server, login, connect to the admin app and insert some new servers/job types:
(operations_env) ubuntu@ubuntuDev:~/operations_env/operations$ python manage.py runserver
Every Job type is a jinja2 template of a bash script.
For example an ansible playbook job type may look like this:
source /opt/ansible/bin/activate
cat << EOT >> /opt/ansible/playbooks/{{ key }}
{{ body }}
EOT
ansible-playbook -i /opt/ansible/hosts /opt/ansible/playbooks/{{ key }}
RC=$?
rm /opt/ansible/playbooks/{{ key }}
exit $RC
… a db2 sql script job type:
export LC_ALL=el_GR.UTF-8
source sqllib/db2profile
cat << EOT > $HOME/{{ key }}
{{ body }}
EOT
db2 +p< $HOME/{{ key }}
RC=$?
rm $HOME/{{ key }}
exit $RC
Form, view and urls
All these bellow are basic django you may be familiar with:
~/operations_env/operations/piston/forms.py
from django import forms from piston.models import JobType, Server class SubmitForm(forms.Form): server = forms.ModelChoiceField(label='Server*', queryset=Server.objects.all().order_by('name'), required=True) job_type = forms.ModelChoiceField(label='Type*', queryset=JobType.objects.all().order_by('name'), required=True) job_name = forms.CharField(label='Name*', max_length=50, required=True) body = forms.CharField( label="Body*", widget=forms.Textarea(attrs={'rows': 10}), required=True)
~/operations_env/operations/piston/views.py
from django.views.generic.edit import FormView from piston.forms import SubmitForm class SubmitView(FormView): template_name = 'submit.html' form_class = SubmitForm def form_valid(self, form): return super(SubmitView, self).form_valid(form)
~/operations_env/operations/operations/urls.py
from django.conf.urls import url from django.contrib import admin from piston.views import SubmitView urlpatterns = [ url(r'^admin/', admin.site.urls), url(r'^submit/', SubmitView.as_view(), name='submit'), ]
The demo template
At this point there is work to be done in order to deal with websockets. There is basicaly an html form and some javascript code running in two phases.
For the submission phase (on click of a button):
- We need to create an object with form's data and serialize it.
- We need to create a unique id for each submission and include it to the serialized odject. We do this for concurrency (see the comments in the end).
- We need to create a "console" area in the html page for each submission waiting for "console" messages.
For the socket.receive phase (on arrival of a new message):
- We need to parse the message, concluding if it is a console message or alert.
- For the console message we need to append it on the related console depending on the console id.
- For the alert message we need to prepend it on the alerts area.
Here is the template we use for this demo:
~/operations_env/operations/piston/templates/submit.html
<html> <meta charset="UTF-8"> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous"> <body> <div class="container, col-md-9"> <form class="form-horizontal" action="#" id="job_form">{% csrf_token %} <fieldset> {% for field in form %} {% if field.name == "server" %} <br> <select class="form-control" id="job_form" name="{{field.name}}"> {% for choice in field.field.queryset %} <option>{{ choice.name }}</option> {% endfor %} </select> {% elif field.name == "job_type" %} <br> <select class="form-control" id="job_form" name="{{field.name}}"> {% for choice in field.field.queryset %} <option>{{ choice.name }}</option> {% endfor %} </select> {% elif field.name == "job_name" %} <br> <input class="form-control" id="id_{{ field.name }}" name="{{field.name}}" placeholder="{{ field.label }}"> {% elif field.name == "body" %} <br> <textarea class="form-control" id="id_{{ field.name }}" name="{{field.name}}" placeholder="{{ field.label }}" rows="10"></textarea> {% endif %} {% endfor %} </fieldset> <br> <div class="controls"> <button type="button" class="btn btn-default" name="action" onclick="clsConsole();">CLEAR</button> <button type="button" class="btn btn-danger submit_btn" name="action" onclick="Submit();">SUBMIT!</button> </div> </form> <ul class="nav nav-tabs" role="tablist"> <li><a href="#" class="add-console" style="display: none;"></a></li> </ul> <div class="tab-content"> </div> </div> <div class="container, col-md-3"><br><div id="messages"></div></div> </body> <script src="https://code.jquery.com/jquery-3.1.1.slim.min.js"></script> <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/js/bootstrap.min.js" integrity="sha384-Tc5IQib027qvyjSMfHjOMaLkfuWVxZxUPnCJA7l2mCWNIpG9mGCD8wGNIcPD7Txa" crossorigin="anonymous"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/reconnecting-websocket/1.0.0/reconnecting-websocket.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery-serialize-object/2.5.0/jquery.serialize-object.min.js"></script> <script> var ws_scheme = window.location.protocol == "http" ? "wss" : "ws"; var ws_console = ws_scheme + '://' + window.location.host + '/submit/'; var socket = new ReconnectingWebSocket(ws_console); var counter = 0; socket.onmessage = function(e) { var message_data = JSON.parse(e.data) if (typeof message_data['console'] !== 'undefined') { var console_line = message_data['console']; var div_id = message_data['div_id']; var color_id = message_data['color']; $("<div />", { id : div_id }) .append(console_line) .appendTo("#" + div_id) .css('color', color_id); } else if ( typeof message_data['alert'] !== 'undefined') { $('#messages').prepend(message_data['alert']); } } socket.onopen = function() { console.log("WS connected." ); } if (socket.readyState == WebSocket.OPEN) socket.onopen(); function Submit() { counter += 1; $("#sub_key").val( "counter" ); formdataser = $('#job_form').serializeObject(); formdataser["sub_id"] = counter; var formdata = JSON.stringify(formdataser); var id = $(".nav-tabs").children().length; var tabId = 'console' + counter; var jobname = formdataser["job_name"] $('.add-console').closest('li').before('<li><a href="#console' + counter + '">' + jobname + '</a></li>'); $('.tab-content').append('<div class="tab-pane" id="' + tabId + '"><br></div>'); $('.nav-tabs li:nth-child(' + id + ') a').click(); socket.send(formdata); } $(".nav-tabs").on("click", "a", function (e) { e.preventDefault(); if (!$(this).hasClass('add-console')) { $(this).tab('show'); } }) .on("click", "span", function () { var anchor = $(this).siblings('a'); $(anchor.attr('href')).remove(); $(this).parent().remove(); $(".nav-tabs li").children('a').first().click(); }); function clsConsole() { $(".nav-tabs").html(""); $(".nav-tabs").append("<li><a href='#' class='add-console' style='display: none;'></a></li>"); $(".tab-content").html(""); } </script> </html>
The channels
In order to play with channels we need three things. Configuration, routing and consumers:
~/operations_env/operations/operations/settings.py
INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.messages', 'django.contrib.staticfiles', 'channels', 'piston', ] CHANNEL_LAYERS = { "default": { "BACKEND": "asgi_redis.RedisChannelLayer", "CONFIG": { "hosts": [("localhost", 6379)], }, "ROUTING": "piston.routing.channel_routing", }, }
B. The file routing.py in our application directory:
~/operations_env/operations/piston/routing.py
from channels.routing import route from piston.consumers import MyConsumer channel_routing = [ route("websocket.receive", MyConsumer), ]
C. And finally the file consumers.py in our application directory:
~/operations_env/operations/piston/consumers.py
from channels.generic.websockets import JsonWebsocketConsumer from paramiko import SSHClient, AutoAddPolicy from jinja2 import Template from piston.models import JobType, Server ssh = SSHClient() class MyConsumer(JsonWebsocketConsumer): channel_session_user = True def connection_groups(self, **kwargs): """ Called to return the list of groups to automatically add/remove this connection to/from. """ return ["operators"] #pass def connect(self, message, **kwargs): """ Perform things on connection start """ pass def receive(self, content, **kwargs): """ Called when a message is received with decoded JSON content """ ssh = SSHClient() ssh.set_missing_host_key_policy(AutoAddPolicy()) server_name, job_type, job_name, body, sub_id = content['server'], content['job_type'], content['job_name'], str(content['body']), content['sub_id'] template = Template(JobType.objects.get(name=job_type).template) username = Server.objects.get(name=server_name).username password = Server.objects.get(name=server_name).password server = Server.objects.get(name=server_name).hostname channel_key = self.message.channel_session.session_key channel_user = self.message.user script_id = channel_key + '_' + str(sub_id) script = template.render( body=body, key=script_id ) print(script) ssh.connect(server, username=username, password=password) ssh_stdin, ssh_stdout, ssh_stderr = ssh.exec_command(script) self.groupAlert('<div class=\"alert alert-info alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">×</span></button><strong>Info!</strong> The job \'%s\' started.</div>' % job_name) for line in iter(ssh_stdout.readline, ''): self.conWrite( ('<samp>'+line+'</samp>'), ('console' + str(sub_id)), 'Green') for line in iter(ssh_stderr.readline, ''): self.conWrite( ('<samp>'+line+'</samp>'), ('console' + str(sub_id)), 'Red') if ssh_stdout.channel.recv_exit_status() == 0: self.groupAlert('<div class=\"alert alert-success alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">×</span></button><strong>Success!</strong> The job \'%s\' completed successfuly.</div>' % job_name) self.conWrite( '<p class="bg-success">The job completed OK!</p>', ('console' + str(sub_id)), 'Black') else: self.groupAlert('<div class=\"alert alert-danger alert-dismissible\"><button type=\"button\" class=\"close\" data-dismiss=\"alert\" aria-label=\"Close\"><span aria-hidden=\"true\">×</span></button><strong>Attention!</strong> The job \'%s\' failed.</div>' % job_name) self.conWrite( '<p class="bg-danger">The job failed!</p>', ('console' + str(sub_id)), 'Red') ssh.close() def conCreate(self, div_id, **kwargs): self.send({'div_id': div_id}) def conWrite(self, line=None, div_id='console', color='Black', **kwargs): self.send({'console': line, 'div_id': div_id, 'color': color}) def groupAlert(self, alert, **kwargs): self.group_send('operators', {'alert': alert }) def disconnect(self, message, **kwargs): """ Perform things on connection close """ pass
Comments
Consumers is where all the magic happen:
- We used generic consumers to avoid dealing with low level operations like json decoding, character escaping etc. Thankfully channels developers are providing all these with generic consumers and some decorators.
- In our generic consumer "MyConsumer" every method is called on a web socket event as the comments describe. The methods conWrite and groupAlert was written by us to write to a user's console or to alert the group respectively.
- When we create the script, we render the template with two variables: body and key. The key may be used for tasks running in parallel in the same server and need -for example- to create a temporary file or pipe. We can use the key variable that is unique for every submission to ensure the the tasks can run in parallel.
- With conWrite method we send to client a line of text, a cosole id, and a font color.
- With the groupAlert method we are sending to 'operators' group a line of text.
- We parse the json message from client, run the task, and send back as message, each line of the task's systemout and systemerr with the relevant font color.
- We also notify the group with a bootstrap alert at the beginning and the end of each task.
- Posted by Kostas Koutsogiannopoulos · Oct. 10, 2016