맛동산

OrientDB with Python - Simple CRUD Class, query_async 사용하기 본문

DB/OrientDB

OrientDB with Python - Simple CRUD Class, query_async 사용하기

오지고지리고알파고포켓몬고 2017. 12. 9. 20:47

1. 이번 글의 목표


지난글(OrientDB with Python - pyorient 사용하기)에서 pyorient 라이브러리를 사용하여 OrientDB와 통신하는 법을 다뤄봤습니다.


이번에는 pyorient를 사용하여, CRUD를 수행할 수 있는 간단한 class를 구현하여 지난번엔 다루지 않았던 update와 delete를 포함한 동작들을 자세히 다뤄보고, query_async 메소드를 사용하여 read 과정을 비동기로 처리하는 내용을 구현하겠습니다.


삽입, 조회, 수정, 삭제에 사용되는 각 함수들의 query를 읽어보시면 독자님들의 어플리케이션에서도 OrientDB를 문제없이 적용하실 수 있을거라 생각합니다.



2. query_async, 어디에 쓸 수 있을까?


저의 CorpusTest라는 데이터베이스에는 50만 건의 명사 단어가 있는데, select * 요청시 데이터를 불러오기 위해 약 5초 이상 대기시간이 소요됩니다.


query()나 command() 메소드로 50만 건의 데이터를 불러와서 콘솔에 print한다면, 매번 5초 이상 pyorient client가 데이터를 읽어오기를 기다렸다가 출력을 수행해야 합니다.

이렇게 데이터를 읽어오는 동안 소요되는 대기시간을 병목현상이라고 할 수 있습니다.


하지만 비동기 쿼리인 query_async()를 사용하면 병목현상 없이 query가 수행되는 족족 결과를 print할 수 있습니다.


웹 서비스는 사용자의 요청 -> 서버에서 요청 처리 -> 처리결과 응답으로 이루어지는데, 사용자가 많은 서비스에서 위 케이스처럼 병목현상이 발생할 경우 대기시간 초과로 사용자의 요청이 끊겨버리거나, 읽는 작업이 완료되기 전에 추가 요청이 누적될 경우 이를 처리할 자원이 부족하여 심지어 서비스가 죽는 상황이 발생하기도 합니다.


파이썬 웹에서는 병목현상으로 인한 서비스 불안정을 방지하기 위해 celery 등을 사용하여 이를 처리합니다. 토네이도 프레임워크 역시 이를 방지하기 위한 설계를 갖고 있습니다.


로직에 따라 적용 가능 부분이 다르겠지만 OrientDB를 사용한다면 비동기 쿼리(query_async)로 병목현상에 대해 좀 더 방어적인 프로그램을 작성할 수 있습니다.



3. 코드

 
import pyorient

class MyError(Exception):
    def __init__(self, msg):
        self.msg = msg

    def __str__(self):
        return self.msg

class OrientHandler:
    # 선언
    __id = None
    __pw = None
    __host = None
    __port = None
    __db_name = None

    __client = None
    __session_id = None

    # 생성자
    def __init__(self, id, pw, host, port, db_name):
        self.setConnInfo(id, pw, host, port, db_name)
        self.createConn()
        self.openDB()

    # 연결 정보 설정
    def setConnInfo(self, id, pw, host, port, db_name):
        self.__id = id; self.__pw = pw; self.__host = host; self.__port = port; self.__db_name=db_name

    # 커넥션 생성
    def createConn(self):
        self.__client = pyorient.OrientDB(self.__host, self.__port)
        self.__session_id = self.__client.connect(self.__id, self.__pw)

    # 데이터베이스 열기
    def openDB(self):
        # Call by ref로 return할 필요는 없습니다
        self.__client.db_open(self.__db_name, self.__id, self.__pw)

    # 연결종료
    def close(self):
        self.__client.close()

    # where문 조립
    def __assembleWhere(self, where):
        w = ''
        for i, k_v in enumerate(where.items()):
            w += ' %s = "%s" '%k_v
            if(i<len(where)-1):
                w += 'and'
        return w

    # Create
    def create(self, class_name, values):
        try:
            if values:
                self.__client.command('insert into %s content %s'%(class_name, values))
            else:
                raise MyError("please check parameter : values")
        except Exception as e:
            print('[error : create]',e)

    # Read
    def read(self, class_name, where):
        results = []
        try:
            if where:
                w = self.__assembleWhere(where)
                results = self.__client.command('select * from %s where %s'%(class_name, w))
            else:
                results = self.__client.command('select * from %s'%class_name)
        except Exception as e:
            print('[error : read]',e)
        finally:
            return results

    # Update
    def update(self, class_name, set, where):
        try:
            if set and where: # 조건절이 있을때
                w = self.__assembleWhere(where)
                self.__client.command('update %s merge %s where %s'%(class_name, set, w))
            elif set and not where: # 조건절이 없을때
                self.__client.command('update %s merge %s' % (class_name, set))
            else:
                raise MyError("please check parameter : set")
        except Exception as e:
            print('[error : update]', e)

    # Delete
    # where 인자가 없으면 클래스 내용이 전부 제거됩니다
    def delete(self, class_name, where):
        try:
            if where:
                w = self.__assembleWhere(where)
                self.__client.command('delete from %s where %s'%(class_name, w))
            else:
                self.__client.command('delete from %s'%class_name)
        except Exception as e:
            print('[error : delete]', e)


    # query_async
    def async(self, class_name):
        query = 'select * from %s'%class_name
        self.__client.query_async(query, -1, "*:-1", self.__callback)

    def __callback(self, record):
        print(record)



## end of CrudClass

def showResult(results):
    for r in results:
        print(r)
    print()

if __name__=="__main__":
    id = "your id"
    pw = "your pw"
    host = "localhost"
    port = 2424

    pytest = OrientHandler(id, pw, host, port, 'pytest')

    # create
    pytest.create('test1', {'name': '안기모'})

    # read
    results = pytest.read('test1', None)
    showResult(results)
    #{'@test1': {'name': '김철수'}, 'version': 1, 'rid': '#26:0'}
    #{'@test1': {'name': '박근혜', 'addr': '503'}, 'version': 1, 'rid': '#26:1'}
    #{'@test1': {'name': '김영희', 'age': 23}, 'version': 1, 'rid': '#27:0'}
    #{'@test1': {'name': '안기모'}, 'version': 1, 'rid': '#28:0'}


    # update
    pytest.update('test1', {'age':90}, {'name':'안기모'})
    pytest.update('test1', {'nation': '대한민국'}, None) # 조건절이 없으면 모든 데이터에 적용됩니

    results = pytest.read('test1', {'age':90})
    showResult(results)
    #{'@test1':{'name': '안기모', 'age': 90, 'nation': '대한민국'},'version':2,'rid':'#28:0'}


    # delete
    pytest.delete('test1', {'name': '안기모'})

    # close
    pytest.close() # 재사용하려면 pytest.createConn(); pytest.openDB(); 후 사용



    # query_async
    # CorpusTest라는 데이터베이스는 제가 임의로 만든 데이터베이스입니다
    # 'CorpusTest'와 'noun_key'만 여러분의 데이터베이스와 클래스 이름으로 변경하면 query_async를 사용하실 수 있습니다 
    corpusTest = OrientHandler(id, pw, host, port, 'CorpusTest')
    corpusTest.async('noun_key')



4. 설명


query문에 대해 일일이 설명하진 않겠습니다.


우선 OrientHandler라는 python class를 제작하여 각 database name을 객체로 사용할 수 있도록 설계했습니다.


OrientDB의 sql query 중 몇가지 특이한 문법이 있었습니다.

line 57 : insert에서 content 문법을 사용하면 dict객체를 그대로 사용해서 values(insert into ... values ...)를 대신할 수 있습니다.

line 81 : update에서 merge 문법을 사용하면 dict객체를 그대로 사용해서 set(update set ...)을 대신할 수 있습니다.


line 109 : query_async는 다음과 같은 인자를 가지고 있습니다.

-> client.query_async(<query>, <limit>, <fetch-plan>, <callback>)

limit을 -1로 설정하면 모든 데이터를 읽을때까지 수행됩니다.



5. 마치며


이번 글에서는 Simple CRUD Class를 작성하여 삽입, 조회, 수정, 삭제를 수행하는 query를 다뤄봤습니다. 또한 query_async 메소드로 비동기 쿼리를 실행하는 방법을 다뤄봤습니다.



OrientDB에서도 join 등 관계형 데이터베이스에 사용되는 기능을 사용할 수 있는데, 일단 여기까지 잘 따라오셨다면 OrientDB를 사용하여 어플리케이션을 구현하는데에 큰 문제는 없을 것 같습니다.


다음은 기본적인 CRUD기능 외에 다양한 query문의 사용법을 알아보도록 하겠습니다.

Comments