demo_app.cluster module

This file houses utility functions for interacting with the cluster and portions of the web application. Namely the Websockets server and Echo app implementation reside here.

demo_app.cluster.HIVE_TYPE_MAP = {'int': 'INT', 'map': 'STRING', 'decimal': 'DOUBLE', 'boolean': 'BOOLEAN', 'string': 'STRING'}

Maps types from datum types to Hive types

demo_app.cluster.SPARK_TYPE_MAP = {'int': 'Int', 'map': 'String', 'decimal': 'Double', 'boolean': 'Boolean', 'string': 'String'}

Maps types from datum types to Scala types

class demo_app.cluster.ThreadedGenerator(schema, bps, outputs, data_pool_size=100)

Bases: threading.Thread

A generator which runs on a separate thread when generating data.

This thread will generate data indefinitely unless the program is killed or self.stop() is called.

Parameters:
  • schema (str) – The json schema for the data generator (a file or json string)
  • bps (int) – The number of bytes of data to produce each second.
  • outputs (list) –

    A list containing any any of the following:

    • 'KAFKA'
    • 'FILE'
    • 'HTTP'
    • 'HDFS'

    This determines the locations where data is sent after it is generated. If the string is present then the data will be sent to the specified location. Values other than these are ignored.

export_file(data)

Write out data from the generator to a file in CSV format

The file to write to is found in global.conf. Header lines are not written to the file. All data is appended to a single file. There is no rotation.

When a new data generator starts the file is essentially ‘wiped out’ so make sure to copy the data elsewhere before stopping/restarting the generator.

Parameters:data (dict/object) – The data from the generator here writes out the data as a CSV for easier ingestion into other places like Hive or Spark.
Returns:N/A
export_hdfs(data)

Write out data from the generator to a file in CSV format in HDFS

The file to write to is found in global.conf. Header lines are not written to the file. All data is appended to a single file.

When a new data generator starts the file is essentially ‘wiped out’ so make sure to copy the data elsewhere before stopping/restarting the generator.

Parameters:data (dict/object) – The data from the generator here writes out the data as a CSV for easier ingestion into other places like Hive or Spark.
Returns:N/A
export_http(data)

Export data and POST to an Http endpoint.

Data is ‘pooled’ before being sent in order to save resources and overhead on requests. The default pool value is 1000 records. This means for every 1000 pieces of data, one request will be made. The data is stored as JSON in the body of the request.

The caveat here is that if you stop the data generator the remaining data in the pool will not be sent.

Parameters:data (dict/object) – A piece of data to POST. If the data is still below the pool size we add the data in to the data ‘pool’ and wait for more data to come in. When the threshold is reached a request with all of the data is sent.
Returns:N/A
export_kafka(data)

Export a message to Kafka

This function utlizes the kafka-python library It also reads in the broker URL and topic name from global.conf

Parameters:data (dict/object) – An object to encode from the generator to send to Kafka.
Returns:N/A
run()

Run method for the thread implementation. Runs until the thread is killed.

Parameters:N/A
stop()

Stops the generator by setting the flag to False. This causes the run method to exit and the thread to finish.

Parameters:N/A
class demo_app.cluster.WSDemoApp

Bases: object

This is the Websocket Demo Application.

Here we define the initialization of the server. We also determine how to handle a request to the websocket server. In this case we don’t have different routes on the websocket server other than the root path /.

Here we also define other functions such as broadcast for when we want to send clients information

Parameters:N/A
Returns:N/A
broadcast(message)

Broadcast a message to all server clients.

Parameters:message (str) – The string to broadcast to every client
Returns:N/A
class demo_app.cluster.WSDemoServer(host, port)

Bases: threading.Thread

A threaded wrapper around the websockets server so that we can run the Flask and Websockets in parallel together.

Parameters:port (int) – The port number for the Websockets server to run on.
broadcast(data)

A wrapper on the server’s broadcast method so that it can be easily accessed from the flask application

Parameters:data (str) – A string message to send to the client.
Returns:N/A
run()

Runs the threaded server

Parameters:N/A
Returns:N/A
stop()

Stops the websockets server

class demo_app.cluster.WSEcho(sock, protocols=None, extensions=None, environ=None, heartbeat_freq=None)

Bases: ws4py.websocket.WebSocket

The WebSocket handler for the WebSocket application

This class defines three methods required for the websocket server.

The three methods are

  • opened
  • closed
  • received_message
app_name = 'WebsocketApplication'
closed(code, reason=None)

Defines behavior for when a client disconnects from the websocket server

In this case when a client disconnects we check and remove them from the client list.

opened()

Defines behavior for when a new client connects to the server

In this case we add the client to our list of clients so we know who we can send messages too.

The clients are accessed through environ.

received_message(message)

Defines behavior for when a client sends a message to the server

In this case we don’t expect the clients to send us data so we just log the message.

demo_app.cluster.create_demo_kafka_topic()

Creates a kafka topic for the demo if it doesn’t already exist.

The caveat here in using this is that Kafka must be installed on the same machine as the demo, and thus the same machine as Ambari as well. The function will try to start the Kafka service through Ambari and then once the service is started is will use the location of the Kafka topics script to create the topic

The name for the topic is specified in global.conf.

Parameters:N/A
Returns:True if the creation is successful. False otherwise.
Return type:bool
demo_app.cluster.generate_queries(schema, table_name='demo_table')

Generate test queries based on a configuration for the data generator

Currently supported components

  • Spark
  • Hive
Parameters:schema (str) – The schema for the generator as a JSON string
Returns:An object that holds keys for different objects, where each key points to a list of strings (queries) for various components.
Return type:dict
demo_app.cluster.get_kafka_topics()

List the kafka topics on the current installation.

Requires that Kafka is installed on the same machine and Ambari is up and running. Will start the service and use the Kafka scripts to list out all of the topics.

Parameters:N/A
Returns:[0] will contain the list of all the topics in a string, typically separated by newlines. [1] will contain any errors when retrieving the topics.
Return type:list
demo_app.cluster.kerberize()

Kerberize the cluster using a script. Untested. Can take 10-15 minutes.

This utilizes a script found at https://github.com/crazyadmins/useful-scripts/tree/master/ambari

If you’re running this script on a cluster you should look in configuration/kerberos/ambari.props to make sure the proper values are present in the file or else the script will fail.

Parameters:N/A
Returns:N/A