Part 21: Use Multi Threading To Analyse The Steem Blockchain In Parallel

in #utopian-io6 years ago (edited)

steem-python.png

This tutorial is part of a series where different aspects of programming with steem-python are explained. Links to the other tutorials can be found in the curriculum section below. This part is a direct continuation on Part 19: Analysing The Steem Blockchain From A Custom Block Number For A Custom Block Count. Where the previous part focussed on how to access blocks on the Steem Blockchain 1 by 1 this part will look how this process can be parallelised.


What will I learn

  • Which data is suitable for parallelisation
  • Divide work between threads
  • How does a thread class work
  • Create threads
  • Prevent data corruption with a queue and lock
  • Merge data received from threads

Requirements

  • Python3.6
  • steem-python

Difficulty

  • Intermediate

Tutorial

Setup

Download the file from Github. There is 1 file multi_threaded.py which contains the code. The file takes 2 arguments from the command line which sets the amount of blocks to analyse and how many threads to use.

Run scripts as following:
> python multi_threaded.py 1000 8

Which data is suitable for parallelization?

Parallelisation is great to improve efficiency and use all the cores in current CPUs. However, not all data is equal. The most optimal data for parallelisation is data that is not related to each other and can be combined in infinite ways without altering the outcome.

In this case we will be analysing blocks from the Steem Blockchain and counting how many times each operation is used. For this example it does not matter at which block the counting is started, or in which sequence the blocks are counted. As long as all blocks are counted the end result will be the same. Therefor, this is perfect for parallelisation.

Divide work between threads

For optimal performance work has to be divided equally. The amount of work per thread n can be calculated by taking the total block_count and dividing this by the amount_of_threads. For this to work n has to be a round number, so choose the block_count and amount_of_threads accordingly.

block_count         = int(sys.argv[1])
amount_of_threads   = int(sys.argv[2])

n                   = int(block_count/amount_of_threads)

Each thread has it's own start_block and end_block. To prevent overlap, since the first block is also counted. 1 has to be subtracted from n.

start = initial_value

for each thread:
       start_block = start
         end_block   = start + n -1
       start = start + n


How does a thread class work

To make threads a theading.thread class has to be created. Consisting of an __init__ part and a run() function. The __init__ section contains all unique and shared variables that the thread requires.

class myThread (threading.Thread):
    def __init__(self, thread_id, start_block, end_block, n, blockchain, workQueue, queueLock):
        threading.Thread.__init__(self)
        self.thread_id      = thread_id
        self.start_block    = start_block
        self.end_block      = end_block
        self.n              = n
        self.blockchain     = blockchain
        self.stream         = self.blockchain.stream_from(start_block=start_block, end_block=end_block)
        self.current_block  = self.start_block
        self.workQueue  = workQueue
        self.queueLock      = queueLock

        print (self.thread_id, self.start_block, self.end_block, '\n')

The run() function is used to make the thread do stuff and is called automatically.

def run(self):
        data = {}
        for post in self.stream:
            if post['block'] != self.current_block:
                # Do stuff

Create threads

Create a list for the threads. Create each thread with it's unique and shared variables. Start the thread and append it to the list.

threads = []

for x in range(0, amount_of_threads):
     thread = myThread(x, start, start+ n-1, n, blockchain, workQueue, queueLock)
     thread.start()
     threads.append(thread)
     start = start + n

Prevent data corruption with a queue and lock

The code is set up in such a way that each thread does all it own computations. Then when it is done it adds its data to a queue for the main thread to retrieve from. Since it is possible that multiple threads finish at the same time a locking mechanise is required to prevent data corruption.

# variables
queueLock = threading.Lock()
workQueue = queue.Queue(amount_of_threads)

# locking/unlocking sequence
self.queueLock.acquire()
self.workQueue.put(data)
self.queueLock.release()

Merge data received from threads

The main threads waits for all the threads to finish to return work.

# wait for threads
for t in threads:
       t.join()

Now it can retrieve all the finished work from the queue and merge it together.

merged_data = {}

while not workQueue.empty():
    data = workQueue.get()
    for key in data:
        if key not in merged_data:
            merged_data[key] = data[key]
        else:
            merged_data[key] += data[key]

Running the script

Running the script will analyse the set amount of blocks back in time from the current head block. It will divide the blocks over the amount_of_threads set and prints out each thread and the work this thread has to do. During the process each thread updates it's current progress. At the end the merged data is printed.

Test for yourself for different block_counts and amount_of_threads how much of a difference multi threading yields for this type op work.

python multi_threaded.py 1000 8

0 19512130 19512254
1 19512255 19512379
2 19512380 19512504
3 19512505 19512629
4 19512630 19512754
5 19512755 19512879
6 19512880 19513004
7 19513005 19513129
...
Thread 2 is at block 19512445/19512504 51.20%
Thread 0 is at block 19512199/19512254 54.40%
Thread 3 is at block 19512573/19512629 53.60%
Thread 4 is at block 19512694/19512754 50.40%
Thread 7 is at block 19513071/19513129 52.00%
Thread 6 is at block 19512936/19513004 44.00%
Thread 5 is at block 19512822/19512879 52.80%
Thread 1 is at block 19512321/19512379 52.00%
...
'custom_json': 17688, 'claim_reward_balance': 1569, 'vote': 23629, 'comment': 9053, 'transfer_to_vesting': 82, 'comment_options': 1213, 'limit_order_create': 126, 'fill_order': 74, 'return_vesting_delegation': 675, 'producer_reward': 1000, 'curation_reward': 4428, 'author_reward': 1687, 'transfer': 1615, 'comment_benefactor_reward': 335, 'fill_vesting_withdraw': 82, 'account_update': 327, 'account_create_with_delegation': 43, 'delete_comment': 66, 'fill_transfer_from_savings': 6, 'feed_publish': 98, 'account_witness_vote': 38, 'account_witness_proxy': 5, 'transfer_to_savings': 6, 'account_create': 4, 'limit_order_cancel': 29, 'delegate_vesting_shares': 10, 'withdraw_vesting': 17, 'transfer_from_savings': 3, 'cancel_transfer_from_savings': 2, 'witness_update': 1}

Curriculum

Set up:
Filtering
Voting
Posting
Constructing
Rewards
Transfers
Analysis

The code for this tutorial can be found on GitHub!

This tutorial was written by @juliank in conjunction with @amosbastian.



Posted on Utopian.io - Rewarding Open Source Contributors

Sort:  

I think here multithreading is confused with parallelism and concurrency. They're not the same. Concurrency guarantees two processes are happening simultaneously while multithreading does not. Multi-core processors offer the possibility that two threads can be processed independently and intel's Hyperthreading can make multithreading on the same core really close to multiprocessing or concurrency, it is not. When two threads are processed by the same core, this is done serially and not in parallel.

In a Nutshell

parallelization > multithreading

Good tutorial, i want try this tutorial, thank

Thank you for the contribution. It has been approved.

You can contact us on Discord.
[utopian-moderator]

I like u, but i think so your info after comflik

Hey @steempytutorials I am @utopian-io. I have just upvoted you!

Achievements

  • Seems like you contribute quite often. AMAZING!

Suggestions

  • Contribute more often to get higher and higher rewards. I wish to see you often!
  • Work on your followers to increase the votes/rewards. I follow what humans do and my vote is mainly based on that. Good luck!

Get Noticed!

  • Did you know project owners can manually vote with their own voting power or by voting power delegated to their projects? Ask the project owner to review your contributions!

Community-Driven Witness!

I am the first and only Steem Community-Driven Witness. Participate on Discord. Lets GROW TOGETHER!

mooncryption-utopian-witness-gif

Up-vote this comment to grow my power and help Open Source contributions like this one. Want to chat? Join me on Discord https://discord.gg/Pc8HG9x

Coin Marketplace

STEEM 0.27
TRX 0.13
JST 0.032
BTC 61451.22
ETH 2929.56
USDT 1.00
SBD 3.65