Neo4j와 아파치 스파크

in #kr-dev2 years ago



일반 관찰

Apache Spark는 클러스터형 인메모리 데이터 처리 솔루션으로 많은 시스템에서 대규모 데이터 세트 처리를 쉽게 확장합니다. 또한 데이터에서 그래프 계산 작업을 실행하기 위한 두 가지 프레임워크인 GraphX ​​및 GraphFrames와 함께 제공됩니다.

다양한 방법으로 Spark와 통합할 수 있습니다. Neo4j로 가져올 원시 데이터를 사전 처리(집계, 필터링, 변환)하거나.

Spark는 외부 그래프 컴퓨팅 솔루션으로도 사용할 수 있습니다.

  1. 선택한 하위 그래프의 데이터를 Neo4j에서 Spark로 내보내기,

  2. 분석적 측면을 계산하고,

  3. 결과를 Neo4j에 다시 쓰기

  4. Neo4j 작업 및 Cypher 쿼리에 사용됩니다.

Neo4j 자체는 중대형 그래프에서 신속하게 그래프 처리를 실행할 수 있습니다. 예를 들어 그래프 처리 프로젝트는 Neo4j 서버 확장 또는 사용자 정의 프로시저 로 dbpedia 데이터 세트(10M 노드, 125M 관계)에서 PageRank(5회 반복)를 실행할 수 있음을 보여줍니다 . Spark는 더 큰 데이터 세트나 더 집중적인 컴퓨팅 작업에 더 적합할 수 있습니다.

Neo4j-스파크-커넥터

Neo4j Spark 커넥터 는 바이너리 Bolt 프로토콜을 사용하여 Neo4j 서버와 데이터를 주고받습니다.

이 페이지의 정보는 스파크 커넥터의 이전 버전(2.4.5 릴리스)을 참조합니다 . 최신 정보, 보다 쉽고 현대적인 API 는 Apache Spark용 Neo4j 커넥터를 참조하십시오 .

RDD, DataFrame, GraphX ​​및 GraphFrames 용 Spark-2.0 API를 제공 하므로 Apache Spark에서 Neo4j 그래프 데이터를 사용하고 처리하는 방법을 자유롭게 선택할 수 있습니다.

spark.neo4j.bolt.*Spark 구성 옵션 을 통해 Neo4j-URL, -user 및 -password를 구성합니다.

일반적인 사용법은 다음과 같습니다.

  1. 만들다org.neo4j.spark.Neo4j(sc)

  2. 직접 쿼리 로 cypher(query,[params])nodes(query,[params]), 또는rels(query,[params])
    pattern("Label1",Seq("REL"),"Label2")pattern( ("Label1","prop1"),("REL","prop"),("Label2","prop2") )

  3. 선택적 으로 병렬 처리를 위해 partitions(n)batch(size), 정의rows(count)

  4. 반환할 데이터 유형 선택

    • loadRowRddloadNodeRddsloadRelRdd_loadRdd[T]

    • loadDataFrame,loadDataFrame(schema)

    • loadGraph[VD,ED]

    • loadGraphFrame[VD,ED]

다음은 로드하는 기본 예입니다 RDD[Row].

스칼라클립 보드에 복사

org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd

껍데기클립 보드에 복사

$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<password> \
--packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2,graphframes:graphframes:0.2.0-spark2.0-s_2.11

스칼라클립 보드에 복사

import org.neo4j.spark._

val neo = Neo4j(sc)

val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count

// inferred schema
rdd.first.schema.fieldNames
// => ["id"]
rdd.first.schema("id")
// => StructField(id,LongType,true)

neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
// => res30: Double = 236696.5

neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count
// => res34: Long = 10

DataFrames 및 GraphX ​​에 대해 유사한 작업을 사용할 수 있습니다 . GraphX ​​통합을 통해 작업을 통해 Neo4j에 데이터를 다시 쓸 수도 있습니다 save.

GraphFrames 를 사용하려면 패키지로 선언해야 합니다. 그런 다음 Neo4j의 그래프 데이터로 GraphFrame을 로드하고 그래프 알고리즘 또는 패턴 일치를 실행할 수 있습니다(후자는 Neo4j보다 느림).

스칼라클립 보드에 복사

import org.neo4j.spark._

val neo = Neo4j(sc)

import org.graphframes._

val graphFrame = neo.pattern(("Person","id"),("KNOWS",null), ("Person","id")).partitions(3).rows(1000).loadGraphFrame

graphFrame.vertices.count
// => 100
graphFrame.edges.count
// => 1000

val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
val ranked = pageRankFrame.vertices
ranked.printSchema()

val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)
// => top3: Array[org.apache.spark.sql.Row]
// => Array([236716,70,0.62285...], [236653,7,0.62285...], [236658,12,0.62285])

더 많은 예제와 세부 정보는 GitHub 리포지토리의 문서에서 찾을 수 있습니다.

Neo4j-Mazerunner

분석 그래프 처리에 대한 관심으로 Kenny Bastani 는 통합 솔루션 작업을 시작했습니다. 노드 또는 관계 목록과 같은 전용 데이터 세트를 Spark로 내보낼 수 있습니다.

다음 알고리즘을 지원합니다.

  • 페이지 랭크

  • 친밀도 중심성

  • 매개 중심성

  • 삼각형 계산

  • 연결된 구성 요소

  • 강력하게 연결된 구성 요소

그래프 처리 알고리즘을 실행한 후 결과는 Neo4j에 동시에 트랜잭션 방식으로 다시 기록됩니다.

이 접근 방식의 초점 중 하나는 데이터 안전에 있습니다. 따라서 영구 대기열(RabbitMQ)을 사용하여 Neo4j와 Spark 간에 데이터를 통신합니다.

인프라는 Docker 컨테이너를 사용하여 설정되며 Mazerunner Extension이 포함된 Spark, RabbitMQ, HDFS 및 Neo4j 전용 컨테이너가 있습니다.

자세한 내용은 프로젝트의 GitHub 페이지 에서 찾을 수 있습니다 .

데이터 전처리를 위한 Spark

원시 데이터(Chicago Crime 데이터 세트)를 Neo4j로 가져오기에 적합한 형식으로 사전 처리하는 한 가지 예가 Mark Needham 에 의해 시연되었습니다 . 그는 기존 데이터를 가져와 정리 및 집계하고 나중에 더 큰 파일로 다시 결합되는 조각을 출력하는 Spark 작업에 여러 기능을 결합했습니다.

접근 방식은 그의 블로그 게시물인 "Spark: Generating CSV Files to import into Neo4j" 에 자세히 설명되어 있습니다.



출처 : https://neo4j.com/developer/apache-spark/

Sort:  

[광고] STEEM 개발자 커뮤니티에 참여 하시면, 다양한 혜택을 받을 수 있습니다.

Coin Marketplace

STEEM 0.17
TRX 0.16
JST 0.029
BTC 76027.52
ETH 2923.44
USDT 1.00
SBD 2.62