Async API Documentation

New in version 5.0.

Warning

There are known issue with Python 3.8 and the async driver where it gradually slows down. Generally, it’s recommended to use the latest supported version of Python for best performance, stability, and security.

AsyncGraphDatabase

Async Driver Construction

The neo4j.AsyncDriver construction is done via a classmethod on the neo4j.AsyncGraphDatabase class.

class neo4j.AsyncGraphDatabase

Accessor for neo4j.AsyncDriver construction.

classmethod driver(uri, *, auth=None, **config)

Create a driver.

Parameters:
  • uri – the connection URI for the driver, see URI for available URIs.

  • auth – the authentication details, see Auth for available authentication details.

  • config – driver configuration key-word arguments, see Async Driver Configuration for available key-word arguments.

Return type:

AsyncDriver

Driver creation example:

import asyncio

from neo4j import AsyncGraphDatabase

async def main():
    uri = "neo4j://example.com:7687"
    driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "password"))

    await driver.close()  # close the driver object

 asyncio.run(main())

For basic authentication, auth can be a simple tuple, for example:

auth = ("neo4j", "password")

This will implicitly create a neo4j.Auth with a scheme="basic". Other authentication methods are described under Auth.

with block context example:

import asyncio

from neo4j import AsyncGraphDatabase

async def main():
    uri = "neo4j://example.com:7687"
    auth = ("neo4j", "password")
    async with AsyncGraphDatabase.driver(uri, auth=auth) as driver:
        # use the driver
        ...

 asyncio.run(main())
classmethod bookmark_manager(initial_bookmarks=None, bookmarks_supplier=None, bookmarks_consumer=None)

Create a AsyncBookmarkManager with default implementation.

Basic usage example to configure sessions with the built-in bookmark manager implementation so that all work is automatically causally chained (i.e., all reads can observe all previous writes even in a clustered setup):

import neo4j

driver = neo4j.AsyncGraphDatabase.driver(...)
bookmark_manager = neo4j.AsyncBookmarkManager(...)

async with driver.session(
    bookmark_manager=bookmark_manager
) as session1:
    async with driver.session(
        bookmark_manager=bookmark_manager
    ) as session2:
        result1 = await session1.run("<WRITE_QUERY>")
        await result1.consume()
        # READ_QUERY is guaranteed to see what WRITE_QUERY wrote.
        result2 = await session2.run("<READ_QUERY>")
        await result2.consume()

This is a very contrived example, and in this particular case, having both queries in the same session has the exact same effect and might even be more performant. However, when dealing with sessions spanning multiple threads, async Tasks, processes, or even hosts, the bookmark manager can come in handy as sessions are not safe to be used concurrently.

Parameters:
  • initial_bookmarks (Mapping[str, Union[Bookmarks, Iterable[str]]]) – The initial set of bookmarks. The returned bookmark manager will use this to initialize its internal bookmarks per database. If present, this parameter must be a mapping of database names to Bookmarks or an iterable of raw bookmark values (str).

  • bookmarks_supplier (Callable[[Optional[str]], Union[Bookmarks, Awaitable[Bookmarks]]]) – Function which will be called every time the default bookmark manager’s method AsyncBookmarkManager.get_bookmarks() or AsyncBookmarkManager.get_all_bookmarks() gets called. The function will be passed the name of the database (str) if .get_bookmarks is called or None if .get_all_bookmarks is called. The function must return a Bookmarks object. The result of bookmarks_supplier will then be concatenated with the internal set of bookmarks and used to configure the session in creation.

  • bookmarks_consumer (Callable[[str, Bookmarks], Union[None, Awaitable[None]]]) – Function which will be called whenever the set of bookmarks handled by the bookmark manager gets updated with the new internal bookmark set. It will receive the name of the database and the new set of bookmarks.

Returns:

A default implementation of AsyncBookmarkManager.

Return type:

AsyncBookmarkManager

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

New in version 5.0.

URI

On construction, the scheme of the URI determines the type of neo4j.AsyncDriver object created.

Available valid URIs:

  • bolt://host[:port]

  • bolt+ssc://host[:port]

  • bolt+s://host[:port]

  • neo4j://host[:port][?routing_context]

  • neo4j+ssc://host[:port][?routing_context]

  • neo4j+s://host[:port][?routing_context]

uri = "bolt://example.com:7687"
uri = "neo4j://example.com:7687"

Each supported scheme maps to a particular neo4j.AsyncDriver subclass that implements a specific behaviour.

URI Scheme

Driver Object and Setting

bolt

AsyncBoltDriver with no encryption.

bolt+ssc

AsyncBoltDriver with encryption (accepts self signed certificates).

bolt+s

AsyncBoltDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks.

neo4j

AsyncNeo4jDriver with no encryption.

neo4j+ssc

AsyncNeo4jDriver with encryption (accepts self signed certificates).

neo4j+s

AsyncNeo4jDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks.

AsyncDriver

Every Neo4j-backed application will require a driver object.

This object holds the details required to establish connections with a Neo4j database, including server URIs, credentials and other configuration. neo4j.AsyncDriver objects hold a connection pool from which neo4j.AsyncSession objects can borrow connections. Closing a driver will immediately shut down all connections in the pool.

Note

Driver objects only open connections and pool them as needed. To verify that the driver is able to communicate with the database without executing any query, use neo4j.AsyncDriver.verify_connectivity().

class neo4j.AsyncDriver

Base class for all types of neo4j.AsyncDriver, instances of which are used as the primary access point to Neo4j.

property encrypted: bool

Indicate whether the driver was configured to use encryption.

session(**config)

Create a session, see AsyncSession Construction

Parameters:

config – session configuration key-word arguments, see Session Configuration for available key-word arguments.

Returns:

new neo4j.AsyncSession object

Return type:

AsyncSession

async close()

Shut down, closing any open connections in the pool.

Return type:

None

async verify_connectivity(**config)

Verify that the driver can establish a connection to the server.

This verifies if the driver can establish a reading connection to a remote server or a cluster. Some data will be exchanged.

Note

Even if this method raises an exception, the driver still needs to be closed via close() to free up all resources.

Parameters:

config

accepts the same configuration key-word arguments as session().

Warning

All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.

Raises:

DriverError – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.

Return type:

None

Changed in version 5.0: The undocumented return value has been removed. If you need information about the remote server, use get_server_info() instead.

async get_server_info(**config)

Get information about the connected Neo4j server.

Try to establish a working read connection to the remote server or a member of a cluster and exchange some data. Then return the contacted server’s information.

In a cluster, there is no guarantee about which server will be contacted.

Note

Even if this method raises an exception, the driver still needs to be closed via close() to free up all resources.

Parameters:

config

accepts the same configuration key-word arguments as session().

Warning

All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.

Raises:

DriverError – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.

Return type:

ServerInfo

New in version 5.0.

Async Driver Configuration

neo4j.AsyncDriver is configured exactly like neo4j.Driver (see Driver Configuration). The only difference is that the async driver accepts an async custom resolver function:

resolver

A custom resolver function to resolve host and port values ahead of DNS resolution. This function is called with a 2-tuple of (host, port) and should return an iterable of 2-tuples (host, port).

If no custom resolver function is supplied, the internal resolver moves straight to regular DNS resolution.

The custom resolver function can but does not have to be a coroutine.

For example:

from neo4j import AsyncGraphDatabase

async def custom_resolver(socket_address):
    if socket_address == ("example.com", 9999):
        yield "::1", 7687
        yield "127.0.0.1", 7687
    else:
        from socket import gaierror
        raise gaierror("Unexpected socket address %r" % socket_address)

# alternatively
def custom_resolver(socket_address):
    ...

driver = AsyncGraphDatabase.driver("neo4j://example.com:9999",
                                   auth=("neo4j", "password"),
                                   resolver=custom_resolver)
Default:

None

Driver Object Lifetime

For general applications, it is recommended to create one top-level neo4j.AsyncDriver object that lives for the lifetime of the application.

For example:

from neo4j import AsyncGraphDatabase

class Application:

    def __init__(self, uri, user, password)
        self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))

    async def close(self):
        await self.driver.close()

Connection details held by the neo4j.AsyncDriver are immutable. Therefore if, for example, a password is changed, a replacement neo4j.AsyncDriver object must be created. More than one AsyncDriver may be required if connections to multiple databases, or connections as multiple users, are required, unless when using impersonation (impersonated_user).

neo4j.AsyncDriver objects are safe to be used in concurrent coroutines. They are not thread-safe.

AsyncBoltDriver

URI schemes:

bolt, bolt+ssc, bolt+s

Will result in:

class neo4j.AsyncBoltDriver(pool, default_workspace_config)

AsyncBoltDriver is instantiated for bolt URIs and addresses a single database machine. This may be a standalone server or could be a specific member of a cluster.

Connections established by a AsyncBoltDriver are always made to the exact host and port detailed in the URI.

This class is not supposed to be instantiated externally. Use AsyncGraphDatabase.driver() instead.

AsyncNeo4jDriver

URI schemes:

neo4j, neo4j+ssc, neo4j+s

Will result in:

class neo4j.AsyncNeo4jDriver(pool, default_workspace_config)

AsyncNeo4jDriver is instantiated for neo4j URIs. The routing behaviour works in tandem with Neo4j’s Causal Clustering feature by directing read and write behaviour to appropriate cluster members.

This class is not supposed to be instantiated externally. Use AsyncGraphDatabase.driver() instead.

AsyncSessions & AsyncTransactions

All database activity is co-ordinated through two mechanisms: sessions (neo4j.AsyncSession) and transactions (neo4j.AsyncTransaction, neo4j.AsyncManagedTransaction).

A session is a logical container for any number of causally-related transactional units of work. Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required. Sessions provide the top level of containment for database activity. Session creation is a lightweight operation and sessions are not thread safe.

Connections are drawn from the neo4j.AsyncDriver connection pool as required.

A transaction is a unit of work that is either committed in its entirety or is rolled back on failure.

AsyncSession Construction

To construct a neo4j.AsyncSession use the neo4j.AsyncDriver.session() method.

import asyncio

from neo4j import AsyncGraphDatabase

async def main():
    driver = AsyncGraphDatabase(uri, auth=(user, password))
    session = driver.session()
    result = await session.run("MATCH (a:Person) RETURN a.name AS name")
    names = [record["name"] async for record in result]
    await session.close()
    await driver.close()

asyncio.run(main())

Sessions will often be created and destroyed using a with block context. This is the recommended approach as it takes care of closing the session properly even when an exception is raised.

async with driver.session() as session:
    result = await session.run("MATCH (a:Person) RETURN a.name AS name")
    # do something with the result...

Sessions will often be created with some configuration settings, see Session Configuration.

async with driver.session(database="example_database",
                          fetch_size=100) as session:
    result = await session.run("MATCH (a:Person) RETURN a.name AS name")
    # do something with the result...

AsyncSession

class neo4j.AsyncSession

A AsyncSession is a logical context for transactional units of work. Connections are drawn from the AsyncDriver connection pool as required.

Session creation is a lightweight operation and sessions are not safe to be used in concurrent contexts (multiple threads/coroutines). Therefore, a session should generally be short-lived, and must not span multiple threads/asynchronous Tasks.

In general, sessions will be created and destroyed within a with context. For example:

async with driver.session() as session:
    result = await session.run("MATCH (n:Person) RETURN n.name AS name")
    # do something with the result...
Parameters:
  • pool – connection pool instance

  • config – session config instance

Note

Some asyncio utility functions (e.g., asyncio.wait_for() and asyncio.shield()) will wrap work in a asyncio.Task. This introduces concurrency and can lead to undefined behavior as AsyncSession is not concurrency-safe.

Consider this wrong example:

async def dont_do_this(driver):
    async with driver.session() as session:
        await asyncio.shield(session.run("RETURN 1"))

If dont_do_this gets cancelled while waiting for session.run, session.run itself won’t get cancelled (it’s shielded) so it will continue to use the session in another Task. Concurrently, will the async context manager (async with driver.session()) on exit clean up the session. That’s two Tasks handling the session concurrently. Therefore, this yields undefined behavior.

In this particular example, the problem could be solved by shielding the whole coroutine dont_do_this instead of only the session.run. Like so:

async def thats_better(driver):
    async def inner()
        async with driver.session() as session:
            await session.run("RETURN 1")

    await asyncio.shield(inner())
async close()

Close the session.

This will release any borrowed resources, such as connections, and will roll back any outstanding transactions.

Return type:

None

cancel()

Cancel this session.

If the session is already closed, this method does nothing. Else, it will if present, forcefully close the connection the session holds. This will violently kill all work in flight.

The primary purpose of this function is to handle asyncio.CancelledError.

session = await driver.session()
try:
    ...  # do some work
except asyncio.CancelledError:
    session.cancel()
    raise
Return type:

None

closed()

Indicate whether the session has been closed.

Returns:

True if closed, False otherwise.

Return type:

bool

async run(query, parameters=None, **kwargs)

Run a Cypher query within an auto-commit transaction.

The query is sent and the result header received immediately but the neo4j.Result content is fetched lazily as consumed by the client application.

If a query is executed before a previous neo4j.AsyncResult in the same AsyncSession has been fully consumed, the first result will be fully fetched and buffered. Note therefore that the generally recommended pattern of usage is to fully consume one result before executing a subsequent query. If two results need to be consumed in parallel, multiple AsyncSession objects can be used as an alternative to result buffering.

For more usage details, see AsyncTransaction.run().

Parameters:
Raises:

SessionError – if the session has been closed.

Returns:

a new neo4j.AsyncResult object

Return type:

AsyncResult

async last_bookmarks()

Return most recent bookmarks of the session.

Bookmarks can be used to causally chain sessions. For example, if a session (session1) wrote something, that another session (session2) needs to read, use session2 = driver.session(bookmarks=session1.last_bookmarks()) to achieve this.

Combine the bookmarks of multiple sessions like so:

bookmarks1 = await session1.last_bookmarks()
bookmarks2 = await session2.last_bookmarks()
session3 = driver.session(bookmarks=bookmarks1 + bookmarks2)

A session automatically manages bookmarks, so this method is rarely needed. If you need causal consistency, try to run the relevant queries in the same session.

“Most recent bookmarks” are either the bookmarks passed to the session or creation, or the last bookmark the session received after committing a transaction to the server.

Note: For auto-transactions (Session.run()), this will trigger Result.consume() for the current result.

Returns:

the session’s last known bookmarks

Return type:

Bookmarks

async last_bookmark()

Return the bookmark received following the last completed transaction.

Note: For auto-transactions (Session.run()), this will trigger Result.consume() for the current result.

Warning

This method can lead to unexpected behaviour if the session has not yet successfully completed a transaction.

Deprecated since version 5.0: last_bookmark() will be removed in version 6.0. Use last_bookmarks() instead.

Returns:

last bookmark

Return type:

Optional[str]

async begin_transaction(metadata=None, timeout=None)
Begin a new unmanaged transaction. Creates a new AsyncTransaction within this session.

At most one transaction may exist in a session at any point in time. To maintain multiple concurrent transactions, use multiple concurrent sessions.

Note: For auto-transaction (AsyncSession.run) this will trigger an consume for the current result.

Parameters:
  • metadata (Optional[Dict[str, Any]]) – a dictionary with metadata. Specified metadata will be attached to the executing transaction and visible in the output of dbms.listQueries and dbms.listTransactions procedures. It will also get logged to the query.log. This functionality makes it easier to tag transactions and is equivalent to dbms.setTXMetaData procedure, see https://neo4j.com/docs/operations-manual/current/reference/procedures/ for procedure reference.

  • timeout (Optional[float]) – the transaction timeout in seconds. Transactions that execute longer than the configured timeout will be terminated by the database. This functionality allows to limit query/transaction execution time. Specified timeout overrides the default timeout configured in the database using dbms.transaction.timeout setting. Value should not represent a duration of zero or negative duration.

Raises:
Returns:

A new transaction instance.

Return type:

AsyncTransaction

async read_transaction(transaction_function, *args, **kwargs)

Execute a unit of work in a managed read transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns, unless an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def do_cypher_tx(tx, cypher):
    result = await tx.run(cypher)
    values = [record.values() async for record in result]
    return values

async with driver.session() as session:
    values = await session.read_transaction(do_cypher_tx, "RETURN 1 AS x")

Example:

async def get_two_tx(tx):
    result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
    values = []
    async for record in result:
        if len(values) >= 2:
            break
        values.append(record.values())
    # or shorter: values = [record.values()
    #                       for record in await result.fetch(2)]

    # discard the remaining records if there are any
    summary = await result.consume()
    # use the summary for logging etc.
    return values

async with driver.session() as session:
    values = await session.read_transaction(get_two_tx)
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Raises:

SessionError – if the session has been closed.

Returns:

a result as returned by the given unit of work

Return type:

_R

Deprecated since version 5.0: Method was renamed to execute_read().

async execute_read(transaction_function, *args, **kwargs)

Execute a unit of work in a managed read transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns, unless an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def do_cypher_tx(tx, cypher):
    result = await tx.run(cypher)
    values = [record.values() async for record in result]
    return values

async with driver.session() as session:
    values = await session.execute_read(do_cypher_tx, "RETURN 1 AS x")

Example:

async def get_two_tx(tx):
    result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
    values = []
    async for record in result:
        if len(values) >= 2:
            break
        values.append(record.values())
    # or shorter: values = [record.values()
    #                       for record in await result.fetch(2)]

    # discard the remaining records if there are any
    summary = await result.consume()
    # use the summary for logging etc.
    return values

async with driver.session() as session:
    values = await session.execute_read(get_two_tx)
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Raises:

SessionError – if the session has been closed.

Returns:

a result as returned by the given unit of work

Return type:

_R

New in version 5.0.

async write_transaction(transaction_function, *args, **kwargs)

Execute a unit of work in a managed write transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns unless, an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def create_node_tx(tx, name):
    query = "CREATE (n:NodeExample { name: $name }) RETURN id(n) AS node_id"
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

async with driver.session() as session:
    node_id = await session.write_transaction(create_node_tx, "example")
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Raises:

SessionError – if the session has been closed.

Returns:

a result as returned by the given unit of work

Return type:

_R

Deprecated since version 5.0: Method was renamed to execute_write().

async execute_write(transaction_function, *args, **kwargs)

Execute a unit of work in a managed write transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns unless, an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def create_node_tx(tx, name):
    query = "CREATE (n:NodeExample { name: $name }) RETURN id(n) AS node_id"
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

async with driver.session() as session:
    node_id = await session.execute_write(create_node_tx, "example")
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Raises:

SessionError – if the session has been closed.

Returns:

a result as returned by the given unit of work

Return type:

_R

New in version 5.0.

Session Configuration

neo4j.AsyncSession is configured exactly like neo4j.Session (see Session Configuration). The only difference is the async session accepts either a neo4j.api.BookmarkManager object or a neo4j.api.AsyncBookmarkManager as bookmark manager:

bookmark_manager

Specify a bookmark manager for the driver to use. If present, the bookmark manager is used to keep all work on the driver causally consistent.

See BookmarkManager for more information.

Warning

Enabling the BookmarkManager can have a negative impact on performance since all queries will wait for the latest changes to be propagated across the cluster.

For simpler use-cases, sessions (AsyncSession) can be used to group a series of queries together that will be causally chained automatically.

Type:

None, BookmarkManager, or AsyncBookmarkManager

Default:

None

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

AsyncTransaction

Neo4j supports three kinds of async transaction:

Each has pros and cons but if in doubt, use a managed transaction with a transaction function.

Auto-commit Transactions

Auto-commit transactions are the simplest form of transaction, available via neo4j.Session.run(). These are easy to use but support only one statement per transaction and are not automatically retried on failure.

Auto-commit transactions are also the only way to run PERIODIC COMMIT (only Neo4j 4.4 and earlier) or CALL {...} IN TRANSACTIONS (Neo4j 4.4 and newer) statements, since those Cypher clauses manage their own transactions internally.

Example:

import neo4j

async def create_person(driver, name):
    async with driver.session(
        default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
        result = await session.run(query, name=name)
        record = await result.single()
        return record["node_id"]

Example:

import neo4j

async def get_numbers(driver):
    numbers = []
    async with driver.session(
        default_access_mode=neo4j.READ_ACCESS
    ) as session:
        result = await session.run("UNWIND [1, 2, 3] AS x RETURN x")
        async for record in result:
            numbers.append(record["x"])
    return numbers

Explicit Async Transactions

Explicit transactions support multiple statements and must be created with an explicit neo4j.AsyncSession.begin_transaction() call.

This creates a new neo4j.AsyncTransaction object that can be used to run Cypher.

It also gives applications the ability to directly control commit and rollback activity.

class neo4j.AsyncTransaction

Container for multiple Cypher queries to be executed within a single context. AsyncTransaction objects can be used as a context managers (async with block) where the transaction is committed or rolled back on based on whether an exception is raised:

async with await session.begin_transaction() as tx:
    ...
async run(query, parameters=None, **kwparameters)

Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:

>>> query = "CREATE (a:Person { name: $name, age: $age })"
>>> result = await tx.run(query, {"name": "Alice", "age": 33})
>>> result = await tx.run(query, {"name": "Alice"}, age=33)
>>> result = await tx.run(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type system. In Python, this includes bool, int, str, list and dict. Note however that list properties must be homogenous.

Parameters:
  • query (str) – cypher query

  • parameters (Optional[Dict[str, Any]]) – dictionary of parameters

  • kwparameters (Any) – additional keyword parameters

Raises:

TransactionError – if the transaction is already closed

Returns:

a new neo4j.AsyncResult object

Return type:

AsyncResult

async commit()

Mark this transaction as successful and close in order to trigger a COMMIT.

Raises:

TransactionError – if the transaction is already closed

async rollback()

Mark this transaction as unsuccessful and close in order to trigger a ROLLBACK.

Raises:

TransactionError – if the transaction is already closed

async close()

Close this transaction, triggering a ROLLBACK if not closed.

cancel()

Cancel this transaction.

If the transaction is already closed, this method does nothing. Else, it will close the connection without ROLLBACK or COMMIT in a non-blocking manner.

The primary purpose of this function is to handle asyncio.CancelledError.

tx = await session.begin_transaction()
try:
    ...  # do some work
except asyncio.CancelledError:
    tx.cancel()
    raise
Return type:

None

closed()

Indicate whether the transaction has been closed or cancelled.

Returns:

True if closed or cancelled, False otherwise.

Return type:

bool

Closing an explicit transaction can either happen automatically at the end of a async with block, or can be explicitly controlled through the neo4j.AsyncTransaction.commit(), neo4j.AsyncTransaction.rollback(), neo4j.AsyncTransaction.close() or neo4j.AsyncTransaction.cancel() methods.

Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction.

Example:

import neo4j

async def create_person(driver, name):
    async with driver.session(
        default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        tx = await session.begin_transaction()
        node_id = await create_person_node(tx)
        await set_person_name(tx, node_id, name)
        await tx.commit()

async def create_person_node(tx):
    query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
    name = "default_name"
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

async def set_person_name(tx, node_id, name):
    query = "MATCH (a:Person) WHERE id(a) = $id SET a.name = $name"
    result = await tx.run(query, id=node_id, name=name)
    summary = await result.consume()
    # use the summary for logging etc.

Managed Async Transactions (transaction functions)

Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.

These allow a function object representing the transactional unit of work to be passed as a parameter. This function is called one or more times, within a configurable time limit, until it succeeds. Results should be fully consumed within the function and only aggregate or status values should be returned. Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a neo4j.AsyncManagedTransaction object as its first parameter.

class neo4j.AsyncManagedTransaction

Transaction object provided to transaction functions.

Inside a transaction function, the driver is responsible for managing (committing / rolling back) the transaction. Therefore, AsyncManagedTransactions don’t offer such methods. Otherwise, they behave like AsyncTransaction.

  • To commit the transaction, return anything from the transaction function.

  • To rollback the transaction, raise any exception.

Note that transaction functions have to be idempotent (i.e., the result of running the function once has to be the same as running it any number of times). This is, because the driver will retry the transaction function if the error is classified as retryable.

New in version 5.0: Prior, transaction functions used AsyncTransaction objects, but would cause hard to interpret errors when managed explicitly (committed or rolled back by user code).

async run(query, parameters=None, **kwparameters)

Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:

>>> query = "CREATE (a:Person { name: $name, age: $age })"
>>> result = await tx.run(query, {"name": "Alice", "age": 33})
>>> result = await tx.run(query, {"name": "Alice"}, age=33)
>>> result = await tx.run(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type system. In Python, this includes bool, int, str, list and dict. Note however that list properties must be homogenous.

Parameters:
  • query (str) – cypher query

  • parameters (Optional[Dict[str, Any]]) – dictionary of parameters

  • kwparameters (Any) – additional keyword parameters

Raises:

TransactionError – if the transaction is already closed

Returns:

a new neo4j.AsyncResult object

Return type:

AsyncResult

Example:

async def create_person(driver, name)
    async with driver.session() as session:
        node_id = await session.execute_write(create_person_tx, name)

async def create_person_tx(tx, name):
    query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

To exert more control over how a transaction function is carried out, the neo4j.unit_of_work() decorator can be used.

AsyncResult

Every time a query is executed, a neo4j.AsyncResult is returned.

This provides a handle to the result of the query, giving access to the records within it as well as the result metadata.

Results also contain a buffer that automatically stores unconsumed records when results are consumed out of order.

A neo4j.AsyncResult is attached to an active connection, through a neo4j.AsyncSession, until all its content has been buffered or consumed.

class neo4j.AsyncResult

A handler for the result of Cypher query execution. Instances of this class are typically constructed and returned by AyncSession.run() and AsyncTransaction.run().

async result.__aiter__()
async result.__anext__()
keys()

The keys for the records in this result.

Returns:

tuple of key names

Return type:

tuple

async consume()

Consume the remainder of this result and return a neo4j.ResultSummary.

Example:

async def create_node_tx(tx, name):
    result = await tx.run(
        "CREATE (n:ExampleNode { name: $name }) RETURN n", name=name
    )
    record = await result.single()
    value = record.value()
    summary = await result.consume()
    return value, summary

async with driver.session() as session:
    node_id, summary = await session.execute_write(
        create_node_tx, "example"
    )

Example:

async def get_two_tx(tx):
    result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
    values = []
    async for record in result:
        if len(values) >= 2:
            break
        values.append(record.values())
    # or shorter: values = [record.values()
    #                       for record in await result.fetch(2)]

    # discard the remaining records if there are any
    summary = await result.consume()
    # use the summary for logging etc.
    return values, summary

async with driver.session() as session:
    values, summary = await session.execute_read(get_two_tx)
Returns:

The neo4j.ResultSummary for this result

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed.

Return type:

ResultSummary

Changed in version 5.0: Can raise ResultConsumedError.

async single(strict: te.Literal[False] = False) Optional[Record]
async single(strict: te.Literal[True]) Record

Obtain the next and only remaining record or None.

Calling this method always exhausts the result.

A warning is generated if more than one record is available but the first of these is still returned.

Parameters:

strict – If True, raise a neo4j.ResultNotSingleError instead of returning None if there is more than one record or warning if there are more than 1 record. False by default.

Warns:

if more than one record is available

Raises:
  • ResultNotSingleError – If strict=True and not exactly one record is available.

  • ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Returns:

the next neo4j.Record or None if none remain

Changed in version 5.0: Added strict parameter.

Changed in version 5.0: Can raise ResultConsumedError.

async fetch(n)

Obtain up to n records from this result.

Parameters:

n (int) – the maximum number of records to fetch.

Returns:

list of neo4j.Record

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

List[Record]

New in version 5.0.

async peek()

Obtain the next record from this result without consuming it. This leaves the record in the buffer for further processing.

Returns:

the next neo4j.Record or None if none remain.

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

Optional[Record]

Changed in version 5.0: Can raise ResultConsumedError.

async graph()

Return a neo4j.graph.Graph instance containing all the graph objects in the result. After calling this method, the result becomes detached, buffering all remaining records.

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Returns:

a result graph

Return type:

Graph

Changed in version 5.0: Can raise ResultConsumedError.

async value(key=0, default=None)

Helper function that return the remainder of the result as a list of values.

See neo4j.Record.value

Parameters:
  • key (Union[int, str]) – field to return for each remaining record. Obtain a single value from the record by index or key.

  • default (Optional[object]) – default value, used if the index of key is unavailable

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Returns:

list of individual values

Return type:

List[Any]

Changed in version 5.0: Can raise ResultConsumedError.

async values(*keys)

Helper function that return the remainder of the result as a list of values lists.

See neo4j.Record.values

Parameters:

keys (Union[int, str]) – fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Returns:

list of values lists

Return type:

List[List[Any]]

Changed in version 5.0: Can raise ResultConsumedError.

async data(*keys)

Helper function that return the remainder of the result as a list of dictionaries.

See neo4j.Record.data

Parameters:

keys (Union[int, str]) – fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

Returns:

list of dictionaries

Return type:

list

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Changed in version 5.0: Can raise ResultConsumedError.

async to_df(expand=False, parse_dates=False)

Convert (the rest of) the result to a pandas DataFrame.

This method is only available if the pandas library is installed.

res = await tx.run("UNWIND range(1, 10) AS n RETURN n, n+1 as m")
df = await res.to_df()

for instance will return a DataFrame with two columns: n and m and 10 rows.

Parameters:
  • expand (bool) –

    If True, some structures in the result will be recursively expanded (flattened out into multiple columns) like so (everything inside <...> is a placeholder):

    • Node objects under any variable <n> will be expanded into columns (the recursion stops here)

      • <n>().prop.<property_name> (any) for each property of the node.

      • <n>().element_id (str) the node’s element id. See Node.element_id.

      • <n>().labels (frozenset of str) the node’s labels. See Node.labels.

    • Relationship objects under any variable <r> will be expanded into columns (the recursion stops here)

      • <r>->.prop.<property_name> (any) for each property of the relationship.

      • <r>->.element_id (str) the relationship’s element id. See Relationship.element_id.

      • <r>->.start.element_id (str) the relationship’s start node’s element id. See Relationship.start_node.

      • <r>->.end.element_id (str) the relationship’s end node’s element id. See Relationship.end_node.

      • <r>->.type (str) the relationship’s type. See Relationship.type.

    • list objects under any variable <l> will be expanded into

      • <l>[].0 (any) the 1st list element

      • <l>[].1 (any) the 2nd list element

    • dict objects under any variable <d> will be expanded into

      • <d>{}.<key1> (any) the 1st key of the dict

      • <d>{}.<key2> (any) the 2nd key of the dict

    • list and dict objects are expanded recursively. Example:

      variable x: [{"foo": "bar", "baz": [42, 0]}, "foobar"]
      

      will be expanded to:

      {
          "x[].0{}.foo": "bar",
          "x[].0{}.baz[].0": 42,
          "n[].0{}.baz[].1": 0,
          "n[].1": "foobar"
      }
      
    • Everything else (including Path objects) will not be flattened.

    dict keys and variable names that contain . or \ will be escaped with a backslash (\. and \\ respectively).

  • parse_dates (bool) – If True, columns that excluvively contain time.DateTime objects, time.Date objects, or None, will be converted to pandas.Timestamp.

Raises:
  • ImportError – if pandas library is not available.

  • ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

pandas.DataFrame

closed()

Return True if the result has been closed.

When a result gets consumed consume() or the transaction that owns the result gets closed (committed, rolled back, closed), the result cannot be used to acquire further records.

In such case, all methods that need to access the Result’s records, will raise a ResultConsumedError when called.

Returns:

whether the result is closed.

Return type:

bool

New in version 5.0.

See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.

AsyncBookmarkManager

class neo4j.api.AsyncBookmarkManager

Same as BookmarkManager but with async methods.

The driver comes with a default implementation of the async bookmark manager accessible through AsyncGraphDatabase.bookmark_manager().

New in version 5.0.

abstract async update_bookmarks(database, previous_bookmarks, new_bookmarks)

Handle bookmark updates.

Parameters:
  • database (str) – The database which the bookmarks belong to

  • previous_bookmarks (Collection[str]) – The bookmarks used at the start of a transaction

  • new_bookmarks (Collection[str]) – The new bookmarks retrieved at the end of a transaction

Return type:

None

abstract async get_bookmarks(database)

Return the bookmarks for a given database.

Parameters:

database (str) – The database which the bookmarks belong to

Returns:

The bookmarks for the given database

Return type:

Collection[str]

abstract async get_all_bookmarks()

Return all bookmarks for all known databases.

Returns:

The collected bookmarks.

Return type:

Collection[str]

abstract async forget(databases)

Forget the bookmarks for the given databases.

This method is not called by the driver. Forgetting unused databases is the user’s responsibility.

Parameters:

databases (Iterable[str]) – The databases which the bookmarks will be removed for.

Return type:

None

Async Cancellation

Async Python provides a mechanism for cancelling futures (asyncio.Future.cancel()). The driver and its components can handle this. However, generally, it’s not advised to rely on cancellation as it forces the driver to close affected connections to avoid leaving them in an undefined state. This makes the driver less efficient.

The easiest way to make sure your application code’s interaction with the driver is playing nicely with cancellation is to always use the async context manager provided by neo4j.AsyncSession like so:

async with driver.session() as session:
    ...  # do what you need to do with the session

If, for whatever reason, you need handle the session manually, you can it like so:

session = await with driver.session()
try:
    ...  # do what you need to do with the session
except asyncio.CancelledError:
    session.cancel()
    raise
finally:
    # this becomes a no-op if the session has been cancelled before
    await session.close()

As mentioned above, any cancellation of I/O work will cause the driver to close the affected connection. This will kill any neo4j.AsyncTransaction and neo4j.AsyncResult objects that are attached to that connection. Hence, after catching a asyncio.CancelledError, you should not try to use transactions or results created earlier. They are likely to not be valid anymore.

Furthermore, there is no guarantee as to whether a piece of ongoing work got successfully executed on the server side or not, when a cancellation happens: await transaction.commit() and other methods can throw asyncio.CancelledError but still have managed to complete from the server’s perspective.