Part 21: Use Multi Threading To Analyse The Steem Blockchain In Parallel
This tutorial is part of a series where different aspects of programming with steem-python
are explained. Links to the other tutorials can be found in the curriculum section below. This part is a direct continuation on Part 19: Analysing The Steem Blockchain From A Custom Block Number For A Custom Block Count. Where the previous part focussed on how to access blocks on the Steem Blockchain
1 by 1 this part will look how this process can be parallelised.
What will I learn
- Which data is suitable for parallelisation
- Divide work between threads
- How does a thread class work
- Create threads
- Prevent data corruption with a queue and lock
- Merge data received from threads
Requirements
- Python3.6
steem-python
Difficulty
- Intermediate
Tutorial
Setup
Download the file from Github. There is 1 file multi_threaded.py
which contains the code. The file takes 2 arguments from the command line which sets the amount of blocks
to analyse and how many threads
to use.
Run scripts as following:
> python multi_threaded.py 1000 8
Which data is suitable for parallelization?
Parallelisation
is great to improve efficiency and use all the cores in current CPUs. However, not all data is equal. The most optimal data for parallelisation
is data that is not related to each other and can be combined in infinite ways without altering the outcome.
In this case we will be analysing blocks from the Steem Blockchain
and counting how many times each operation
is used. For this example it does not matter at which block
the counting is started, or in which sequence the blocks are counted. As long as all blocks
are counted the end result will be the same. Therefor, this is perfect for parallelisation
.
Divide work between threads
For optimal performance work has to be divided equally. The amount of work per thread n
can be calculated by taking the total block_count
and dividing this by the amount_of_threads
. For this to work n
has to be a round number, so choose the block_count
and amount_of_threads
accordingly.
block_count = int(sys.argv[1])
amount_of_threads = int(sys.argv[2])
n = int(block_count/amount_of_threads)
Each thread has it's own start_block
and end_block
. To prevent overlap, since the first block is also counted. 1 has to be subtracted from n
.
start = initial_value
for each thread:
start_block = start
end_block = start + n -1
start = start + n
How does a thread class work
To make threads a theading.thread class has to be created. Consisting of an __init__
part and a run()
function. The __init__
section contains all unique and shared variables that the thread requires.
class myThread (threading.Thread):
def __init__(self, thread_id, start_block, end_block, n, blockchain, workQueue, queueLock):
threading.Thread.__init__(self)
self.thread_id = thread_id
self.start_block = start_block
self.end_block = end_block
self.n = n
self.blockchain = blockchain
self.stream = self.blockchain.stream_from(start_block=start_block, end_block=end_block)
self.current_block = self.start_block
self.workQueue = workQueue
self.queueLock = queueLock
print (self.thread_id, self.start_block, self.end_block, '\n')
The run()
function is used to make the thread do stuff and is called automatically.
def run(self):
data = {}
for post in self.stream:
if post['block'] != self.current_block:
# Do stuff
Create threads
Create a list for the threads
. Create each thread
with it's unique and shared variables. Start the thread and append it to the list.
threads = []
for x in range(0, amount_of_threads):
thread = myThread(x, start, start+ n-1, n, blockchain, workQueue, queueLock)
thread.start()
threads.append(thread)
start = start + n
Prevent data corruption with a queue and lock
The code is set up in such a way that each thread does all it own computations. Then when it is done it adds its data to a queue
for the main thread to retrieve from. Since it is possible that multiple threads finish at the same time a locking mechanise is required to prevent data corruption.
# variables
queueLock = threading.Lock()
workQueue = queue.Queue(amount_of_threads)
# locking/unlocking sequence
self.queueLock.acquire()
self.workQueue.put(data)
self.queueLock.release()
Merge data received from threads
The main threads waits for all the threads to finish to return work.
# wait for threads
for t in threads:
t.join()
Now it can retrieve all the finished work from the queue
and merge it together.
merged_data = {}
while not workQueue.empty():
data = workQueue.get()
for key in data:
if key not in merged_data:
merged_data[key] = data[key]
else:
merged_data[key] += data[key]
Running the script
Running the script will analyse the set amount of blocks
back in time from the current head block
. It will divide the blocks
over the amount_of_threads
set and prints out each thread
and the work this thread
has to do. During the process each thread updates it's current progress. At the end the merged data is printed.
Test for yourself for different block_counts
and amount_of_threads
how much of a difference multi threading yields for this type op work.
python multi_threaded.py 1000 8
0 19512130 19512254
1 19512255 19512379
2 19512380 19512504
3 19512505 19512629
4 19512630 19512754
5 19512755 19512879
6 19512880 19513004
7 19513005 19513129
...
Thread 2 is at block 19512445/19512504 51.20%
Thread 0 is at block 19512199/19512254 54.40%
Thread 3 is at block 19512573/19512629 53.60%
Thread 4 is at block 19512694/19512754 50.40%
Thread 7 is at block 19513071/19513129 52.00%
Thread 6 is at block 19512936/19513004 44.00%
Thread 5 is at block 19512822/19512879 52.80%
Thread 1 is at block 19512321/19512379 52.00%
...
'custom_json': 17688, 'claim_reward_balance': 1569, 'vote': 23629, 'comment': 9053, 'transfer_to_vesting': 82, 'comment_options': 1213, 'limit_order_create': 126, 'fill_order': 74, 'return_vesting_delegation': 675, 'producer_reward': 1000, 'curation_reward': 4428, 'author_reward': 1687, 'transfer': 1615, 'comment_benefactor_reward': 335, 'fill_vesting_withdraw': 82, 'account_update': 327, 'account_create_with_delegation': 43, 'delete_comment': 66, 'fill_transfer_from_savings': 6, 'feed_publish': 98, 'account_witness_vote': 38, 'account_witness_proxy': 5, 'transfer_to_savings': 6, 'account_create': 4, 'limit_order_cancel': 29, 'delegate_vesting_shares': 10, 'withdraw_vesting': 17, 'transfer_from_savings': 3, 'cancel_transfer_from_savings': 2, 'witness_update': 1}
Curriculum
Set up:
- Part 0: How To Install Steem-python, The Official Steem Library For Python
- Part 1: How To Configure The Steempy CLI Wallet And Upvote An Article With Steem-Python
Filtering
- Part 2: How To Stream And Filter The Blockchain Using Steem-Python
- Part 6: How To Automatically Reply To Mentions Using Steem-Python
Voting
- Part 3: Creating A Dynamic Autovoter That Runs 24/7
- Part 4: How To Follow A Voting Trail Using Steem-Python
- Part 8: How To Create Your Own Upvote Bot Using Steem-Python
Posting
- Part 5: Post An Article Directly To The Steem Blockchain And Automatically Buy Upvotes From Upvote Bots
- Part 7: How To Schedule Posts And Manually Upvote Posts For A Variable Voting Weight With Steem-Python
Constructing
Rewards
- Part 9: How To Calculate A Post's Total Rewards Using Steem-Python
- Part 12: How To Estimate Curation Rewards Using Steem-Python
- Part 14: How To Estimate All Rewards In Last N Days Using Steem-Python
- Part 20: Plotting Account's Total Generated Post Rewards Since Creation
Transfers
- Part 11: How To Build A List Of Transfers And Broadcast These In One Transaction With Steem-Python
- Part 13: Upvote Posts In Batches Based On Current Voting Power With Steem-Python
Analysis
- Part 15: How To Check If An Account Is Following Back And Retrieve Mutual Followers/Following Between Two Accounts
- Part 16: How To Analyse A User's Vote History In A Specific Time Period Using Steem-Python
- Part 18: How To Analyse An Account's Resteemers Using Steem-Python
- Part 19: Analysing The Steem Blockchain From A Custom Block Number For A Custom Block Count
The code for this tutorial can be found on GitHub!
This tutorial was written by @juliank in conjunction with @amosbastian.
Posted on Utopian.io - Rewarding Open Source Contributors
I think here multithreading is confused with parallelism and concurrency. They're not the same. Concurrency guarantees two processes are happening simultaneously while multithreading does not. Multi-core processors offer the possibility that two threads can be processed independently and intel's Hyperthreading can make multithreading on the same core really close to multiprocessing or concurrency, it is not. When two threads are processed by the same core, this is done serially and not in parallel.
In a Nutshell
parallelization > multithreading
Good tutorial, i want try this tutorial, thank
Thank you for the contribution. It has been approved.
You can contact us on Discord.
[utopian-moderator]
I like u, but i think so your info after comflik
Hey @steempytutorials I am @utopian-io. I have just upvoted you!
Achievements
Suggestions
Get Noticed!
Community-Driven Witness!
I am the first and only Steem Community-Driven Witness. Participate on Discord. Lets GROW TOGETHER!
Up-vote this comment to grow my power and help Open Source contributions like this one. Want to chat? Join me on Discord https://discord.gg/Pc8HG9x