[개발] steem 계정명 기준으로 모든 컨텐츠 정보 읽어들이기
개요
nodejs 로 작성한 소스로, 해당 소스를 통해 steem block 에 기록된 계정명 기준 모든 컨텐츠(포스팅) 정보를 읽어들일 수 있도록 구성 하였습니다. 우선 파일에 처리 내역을 기록하도록 구성 하였는데, 이후 DB 에 정보를 기록할 예정 입니다. ( docker를 활용하여 mongo db 로 기록 )
이후 crontab 에 해당 작업을 등록하여 주기적으로 작업을 처리하도록 구성할 예정 입니다. ( 최초 작업 처리 시 시간이 많이 소요되는 부분에 유의 )
관련 소스
////////////////////////////////////////////////////////////
//
// information (소개)
//
/*
steem 의 계정 기준 정보를 읽어들여 DB에 적재한다.
*/
////////////////////////////////////////////////////////////
//
// require
//
const dateformat = require('dateformat'); // 날짜유틸
const steem = require('steem'); // STEEM BLOCK CHAIN
const wfile = require('../util/wlib/wfile');
const path = require('path');
////////////////////////////////////////////////////////////
//
// config
//
require('dotenv').config();
steem.api.setOptions({ url: 'https://api.justyy.com' }); // api.justyy.com , api.steemit.com
////////////////////////////////////////////////////////////
//
// const
//
/**
* 환경 설정용 값
*/
const __CFG = {
CD : {
OK : 0,
PASS : {
ALREADY_LOADED : 100,
},
ERR : {
MAX_RETRY_OVER : -100,
ACCOUNT_NOT_FOUND_OR_NO_ACTIONS : -200,
NOT_EXIST_CONTENTS : -300,
},
},
COUNT : {
MAX_RETRY : 3,
DATA_LOAD : 10,
},
TIME : {
SLEEP_SEC : 1000 * 3,
},
PATH : {
DATA_ROOT : path.join(__dirname, '../static/ah'),
},
};
////////////////////////////////////////////////////////////
//
// variables
//
////////////////////////////////////////////////////////////
//
// private functions
//
function _removeDuplicated(arr, key){
let temp = [];
let keys = [];
for(let a of arr){
if(!keys.includes(a[key]) ){
keys.push(a[key]);
temp.push(a);
}
}
return temp;
}
////////////////////////////////////////////////////////////
//
// public functions
//
/**
* 해당 계정기준 최종 히스토리 인덱스 정보를 리턴
* ( 가끔 해당 번호가 오락 가락 하니 수신 측에서는 방어 코드 필요 )
* @param {String} author 계정명
* @returns 히스토리 최종 인덱스 번호
*/
function getAccountHistoryIdx(author){
return new Promise((resolve, reject)=>{
let _fn = (author, retry=0) => {
steem.api.getAccountHistoryAsync(author, -1, 0)
.then(res=>{
if(res.length==0){
resolve({cd: __CFG.CD.ERR.ACCOUNT_NOT_FOUND_OR_NO_ACTIONS, message:'ACCOUNT_NOT_FOUND_OR_NO_ACTIONS'});
}else{
resolve({cd: __CFG.CD.OK, message:'ok', data:res[0][0]});
}
})
.catch(async err=>{
retry++;
if(retry==__CFG.COUNT.MAX_RETRY){
reject({cd: __CFG.CD.ERR.MAX_RETRY_OVER, message:'MAX_RETRY_OVER'});
}else{
console.error('getgetAccountHistoryIdx', err.toString());
await new Promise(resolve=>setTimeout(resolve, __CFG.TIME.SLEEP_SEC));
_fn(author, retry);
}
});
};
_fn(author);
});
}
/**
* 계정 이력 정보를 모두 읽어들인다
* 기존에 읽었던 부분은 따로 기록
* @param {String} author
* @returns 계정 이력 정보
*/
function loadAccountHistory(author, prevIdx=0){
return new Promise( async (resolve, reject)=>{
// 최종 계정 히스토리 인덱스 정보 확보
let _lastIdx = await getAccountHistoryIdx(author);
if(_lastIdx.cd != 0 ){
return reject(_lastIdx);
}
// 사용 변수 정의
let lastIdx = _lastIdx.data;
let limit = (lastIdx - prevIdx > 999)? 999 : lastIdx - prevIdx;
let fromIdx = prevIdx==0?limit:prevIdx+1;
// 유효성 검증 - 이미 다 읽어들인 내용일 경우
if(lastIdx==prevIdx || limit<1){
return resolve({ cd: __CFG.CD.PASS.ALREADY_LOADED, message:'ALREADY_LOADED'});
}
let _fn = async (author, fromIdx, limit, prevData, lastIdx) => {
let ratio = fromIdx==0?0:parseFloat( (fromIdx*100/lastIdx).toFixed(2) );
console.log('ratio', ratio>100?100:ratio);
try{
let ori = await steem.api.getAccountHistoryAsync(author, fromIdx, limit);
let filtered = ori
.filter(([idx,data])=>data.op[0]=='comment' && data.op[1].title!='')
.map(([idx,data])=>{
let d = new Date(`${data.timestamp}Z`);
return {
id:idx,
timestamp:d.getTime()/1000,
timekr:dateformat(d,'yy.mm.dd HH:MM:ss'),
author:author,
permlink:data.op[1].permlink,
title:data.op[1].title,
body:data.op[1].body,
}; // 과거 작성 당시 정보만 기록 수정 시 @@ 등으로 표기됨
});
if(ratio>=100){
prevData = prevData.concat(filtered);
// prevData = _removeDuplicated(prevData, 'permlink');
resolve({cd:__CFG.CD.OK,message:'OK',data:{author, lastIdx, prevData}});
}else{
// load next
fromIdx = ori[ori.length-1][0] + limit + 1;
prevData = prevData.concat(filtered);
_fn(author, fromIdx, limit, prevData, lastIdx);
}
}catch(err){
console.error('loadAccountHistory', err.toString());
await new Promise(resolve=>setTimeout(resolve, __CFG.TIME.SLEEP_SEC));
_fn(author, fromIdx, limit, prevData, lastIdx);
}
};
_fn(author, fromIdx, limit, [], lastIdx);
});
}
/**
* 해당 계정기준 최종 히스토리 인덱스 정보를 리턴
* ( 가끔 해당 번호가 오락 가락 하니 수신 측에서는 방어 코드 필요 )
* @param {String} author 계정명
* @returns 히스토리 최종 인덱스 번호
*/
function getContents(author, permlink){
return new Promise((resolve, reject)=>{
let _fn = (author, permlink, retry=0) => {
steem.api.getContentAsync(author, permlink)
.then(res=>{
if(res.id==0){
resolve({cd: __CFG.CD.ERR.NOT_EXIST_CONTENTS, message:'NOT_EXIST_CONTENTS'});
}else{
let filtered = {
id : res.id,
author : res.author,
permlink : res.permlink,
category : res.category,
title : res.title,
body : res.body,
json_metadata : res.json_metadata,
created : res.created,
last_update : res.last_update,
url : res.url,
}; // 추후 대상 추출 시 idx 역순으로 해서 출력
resolve({cd: __CFG.CD.OK, message:'OK', data:filtered});
}
})
.catch(async err=>{
retry++;
if(retry==__CFG.COUNT.MAX_RETRY){
reject({cd: __CFG.CD.ERR.MAX_RETRY_OVER, message:"MAX_RETRY_OVER"});
}else{
console.error('getContents', err.toString());
await new Promise(resolve=>setTimeout(resolve, __CFG.TIME.SLEEP_SEC));
_fn(author, permlink, retry);
}
});
};
_fn(author, permlink);
});
}
/**
* 계정명 기준 컨텐츠 정보를 읽어들여 파일로 기록한다
* @param {String} author 계정명
* @returns -
*/
async function saveContents(author){
try{
let prevIdx = wfile.readJson(`${__CFG.PATH.DATA_ROOT}/${author}.index`, {index:0});
console.log(`start ::: saveContents - ${author} / prevIdx - ${prevIdx.index}`);
let ah = await loadAccountHistory(author, prevIdx.index);
if(ah.cd ==__CFG.CD.PASS.ALREADY_LOADED){
console.log(`end ::: saveContents - already loaded / lastIdx - ${prevIdx.index}`);
return;
}
let lastIdx = ah.data.lastIdx;
let prevData = ah.data.prevData;
// ah 정보 업데이트
let ahPrev = wfile.readJson(`${__CFG.PATH.DATA_ROOT}/${author}.ah`, []);
let ahCombined = ahPrev.concat(prevData);
wfile.write(`${__CFG.PATH.DATA_ROOT}/${author}.ah`, JSON.stringify(ahCombined,null,2) );
console.log(`data ::: saveContents prev - ${ahPrev.length}, combined - ${ahCombined.length}`);
// 이전 index update
wfile.write(`${__CFG.PATH.DATA_ROOT}/${author}.index`, JSON.stringify({index:lastIdx},null,2));
console.log(`end ::: saveContents - ${author} / lastIdx - ${lastIdx}`);
// contents 정보 생성
reloadContents(author, prevData);
}catch(e){
console.log(e);
}
}
/**
* author.ah 파일을 읽어들여 해당 정보 기준 contents 정보를 갱신 처리 한다
* @param {String} author 계정명
* @param {Array} prevData 읽어들인 정보
*/
async function reloadContents(author, prevData){
let pushed = [];
prevData = prevData || wfile.readJson(`${__CFG.PATH.DATA_ROOT}/${author}.ah`, []);
let readCount = 0;
// 디렉토리 생성
wfile.makeFolder(`${__CFG.PATH.DATA_ROOT}/${author}`);
// 컨텐츠 읽어들이기
let _fn = async () => {
let contents = await Promise.all(pushed);
for(let c of contents){
if(c.cd==CD_OK){
wfile.write(`${__CFG.PATH.DATA_ROOT}/${c.data.author}/${c.data.permlink}.content`, JSON.stringify(c.data,null,2));
}
}
readCount+= pushed.length;
pushed = [];
console.log(`read : ${readCount} / ${prevData.length}`);
};
// contents 정보 생성
// 한번에 모든 컨텐츠를 읽어들이면 문제가 발생함.
// getContents RPCError: Unable to acquire database lock
try{
for(let p of prevData){
pushed.push(getContents(p.author, p.permlink));
if(pushed.length==__CFG.COUNT.DATA_LOAD) {
await _fn();
}
}
if(pushed.length>0) {
await _fn();
}
}catch(e){
console.log("reloadContents", e);
}
}
////////////////////////////////////////////////////////////
//
// init
//
async function init(){
saveContents('krnews');
}
init();
@wonsama transfered 6 KRWP to @krwp.burn. voting percent : 8.44%, voting power : 95.83%, steem power : 1739948.09, STU KRW : 1200.
@wonsama staking status : 1142.929 KRWP
@wonsama limit for KRWP voting service : 5.714 KRWP (rate : 0.005)
What you sent : 6 KRWP
Refund balance : 0.285 KRWP [52537582 - 1f8ee45fd095beb20676c5be4fa1ba98d2d75f87]