Neo4j와 아파치 스파크
일반 관찰
Apache Spark는 클러스터형 인메모리 데이터 처리 솔루션으로 많은 시스템에서 대규모 데이터 세트 처리를 쉽게 확장합니다. 또한 데이터에서 그래프 계산 작업을 실행하기 위한 두 가지 프레임워크인 GraphX 및 GraphFrames와 함께 제공됩니다.
다양한 방법으로 Spark와 통합할 수 있습니다. Neo4j로 가져올 원시 데이터를 사전 처리(집계, 필터링, 변환)하거나.
Spark는 외부 그래프 컴퓨팅 솔루션으로도 사용할 수 있습니다.
선택한 하위 그래프의 데이터를 Neo4j에서 Spark로 내보내기,
분석적 측면을 계산하고,
결과를 Neo4j에 다시 쓰기
Neo4j 작업 및 Cypher 쿼리에 사용됩니다.
Neo4j 자체는 중대형 그래프에서 신속하게 그래프 처리를 실행할 수 있습니다. 예를 들어 그래프 처리 프로젝트는 Neo4j 서버 확장 또는 사용자 정의 프로시저 로 dbpedia 데이터 세트(10M 노드, 125M 관계)에서 PageRank(5회 반복)를 실행할 수 있음을 보여줍니다 . Spark는 더 큰 데이터 세트나 더 집중적인 컴퓨팅 작업에 더 적합할 수 있습니다. |
Neo4j-스파크-커넥터
Neo4j Spark 커넥터 는 바이너리 Bolt 프로토콜을 사용하여 Neo4j 서버와 데이터를 주고받습니다.
이 페이지의 정보는 스파크 커넥터의 이전 버전(2.4.5 릴리스)을 참조합니다 . 최신 정보, 보다 쉽고 현대적인 API 는 Apache Spark용 Neo4j 커넥터를 참조하십시오 . |
RDD, DataFrame, GraphX 및 GraphFrames 용 Spark-2.0 API를 제공 하므로 Apache Spark에서 Neo4j 그래프 데이터를 사용하고 처리하는 방법을 자유롭게 선택할 수 있습니다.
spark.neo4j.bolt.*
Spark 구성 옵션 을 통해 Neo4j-URL, -user 및 -password를 구성합니다.
일반적인 사용법은 다음과 같습니다.
만들다
org.neo4j.spark.Neo4j(sc)
직접 쿼리 로
cypher(query,[params])
,nodes(query,[params])
, 또는rels(query,[params])
pattern("Label1",Seq("REL"),"Label2")pattern( ("Label1","prop1"),("REL","prop"),("Label2","prop2") )
선택적 으로 병렬 처리를 위해
partitions(n)
,batch(size)
, 정의rows(count)
반환할 데이터 유형 선택
loadRowRdd
,loadNodeRdds
,loadRelRdd
_loadRdd[T]
loadDataFrame
,loadDataFrame(schema)
loadGraph[VD,ED]
loadGraphFrame[VD,ED]
다음은 로드하는 기본 예입니다 RDD[Row]
.
스칼라클립 보드에 복사
org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd
껍데기클립 보드에 복사
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<password> \
--packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2,graphframes:graphframes:0.2.0-spark2.0-s_2.11
스칼라클립 보드에 복사
import org.neo4j.spark._
val neo = Neo4j(sc)
val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count
// inferred schema
rdd.first.schema.fieldNames
// => ["id"]
rdd.first.schema("id")
// => StructField(id,LongType,true)
neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
// => res30: Double = 236696.5
neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count
// => res34: Long = 10
DataFrames 및 GraphX 에 대해 유사한 작업을 사용할 수 있습니다 . GraphX 통합을 통해 작업을 통해 Neo4j에 데이터를 다시 쓸 수도 있습니다 save
.
GraphFrames 를 사용하려면 패키지로 선언해야 합니다. 그런 다음 Neo4j의 그래프 데이터로 GraphFrame을 로드하고 그래프 알고리즘 또는 패턴 일치를 실행할 수 있습니다(후자는 Neo4j보다 느림).
스칼라클립 보드에 복사
import org.neo4j.spark._
val neo = Neo4j(sc)
import org.graphframes._
val graphFrame = neo.pattern(("Person","id"),("KNOWS",null), ("Person","id")).partitions(3).rows(1000).loadGraphFrame
graphFrame.vertices.count
// => 100
graphFrame.edges.count
// => 1000
val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
val ranked = pageRankFrame.vertices
ranked.printSchema()
val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)
// => top3: Array[org.apache.spark.sql.Row]
// => Array([236716,70,0.62285...], [236653,7,0.62285...], [236658,12,0.62285])
더 많은 예제와 세부 정보는 GitHub 리포지토리의 문서에서 찾을 수 있습니다.
Neo4j-Mazerunner
분석 그래프 처리에 대한 관심으로 Kenny Bastani 는 통합 솔루션 작업을 시작했습니다. 노드 또는 관계 목록과 같은 전용 데이터 세트를 Spark로 내보낼 수 있습니다.
다음 알고리즘을 지원합니다.
페이지 랭크
친밀도 중심성
매개 중심성
삼각형 계산
연결된 구성 요소
강력하게 연결된 구성 요소
그래프 처리 알고리즘을 실행한 후 결과는 Neo4j에 동시에 트랜잭션 방식으로 다시 기록됩니다.
이 접근 방식의 초점 중 하나는 데이터 안전에 있습니다. 따라서 영구 대기열(RabbitMQ)을 사용하여 Neo4j와 Spark 간에 데이터를 통신합니다.
인프라는 Docker 컨테이너를 사용하여 설정되며 Mazerunner Extension이 포함된 Spark, RabbitMQ, HDFS 및 Neo4j 전용 컨테이너가 있습니다.
자세한 내용은 프로젝트의 GitHub 페이지 에서 찾을 수 있습니다 .
데이터 전처리를 위한 Spark
원시 데이터(Chicago Crime 데이터 세트)를 Neo4j로 가져오기에 적합한 형식으로 사전 처리하는 한 가지 예가 Mark Needham 에 의해 시연되었습니다 . 그는 기존 데이터를 가져와 정리 및 집계하고 나중에 더 큰 파일로 다시 결합되는 조각을 출력하는 Spark 작업에 여러 기능을 결합했습니다.
접근 방식은 그의 블로그 게시물인 "Spark: Generating CSV Files to import into Neo4j" 에 자세히 설명되어 있습니다.
[광고] STEEM 개발자 커뮤니티에 참여 하시면, 다양한 혜택을 받을 수 있습니다.