Use ADX for storing Azure SQL audit logs

Use ADX for storing Azure SQL audit logs

SQL audits may be something you need to collect, but easy options in Azure are only Log Analytics or Storage account. The huge amount of logs may be a little expensive to store in Log Analytics (or Sentinel), especially because Azure SQL audits do not (yet) support resource-specific tables, but instead all data goes into legacy "AzureDiagnostics" table which doesn't support transformations or basic/auxiliary logs. Storage account on the other hand is pretty useless.

However the third option is to stream the logs into Event Hub.

In this short post I'll describe how to use Azure Data Explorer (ADX) to read those logs from Event Hub and store in ADX for much cheaper.

Let's go... these are the high level steps:

1) Create Event Hub namespace

2) Configure Azure SQL audits to Event Hub

3) Create ADX cluster

4) Configure ADX tables, function and data connection


1) Create Event Hub namespace

This is very straightforward. Just follow these instruction to create a Event Hub namespace: Create an event hub using the Azure portal

Be sure that the event hub is in the same region as your (SQL) database and server.

(You don't need to create an event hub within the Event Hub namespace, it'll be created automatically in later phase)


2) Configure Azure SQL audits to Event Hub

This is yet another super easy task, follow these instructions and choose Event Hub as destination: Set up Auditing - Azure SQL Database

My config looks like this:

Article content

If there's activity going on in your SQL, you'll shortly start seeing this in your Event Hub:

Article content

Eventually, when the data keeps coming into Event Hub namespace, there will be new event hub created with name "insights-logs-sqlsecurityauditevents" (it WILL take some time!).


3) Create ADX cluster

Creating ADX cluster is rather easy, it just takes a bit of time to complete. Follow these instructions: Quickstart: Create an Azure Data Explorer cluster and database

For choosing a cluster specifications, I suggest to take a look at Azure Pricing Calculator and estimate your daily data ingestion, it will suggest you the needed "engine". Also you'll able to estimate the pricing. Notice the 1-year reservations will give you remarkable savings.

Once you're cluster is done, you'll define the database name and retention periods.

Article content


4) Configure ADX tables, function and data connection

Now the cluster is up and we'll need to do some configuration, open the newly created database from left menu "Data > Databases" and then choose Query to start.

The following commands will create you (you can use different names if you wish):

  • AzureDiagnosticsRaw table (this will used to ingest the raw data in the first place)
  • Ingestion mapping for AzureDiagnosticsRaw table
  • Retention policy for AzureDiagnosticsRaw table (0 days, since this is intermediate table)
  • AzureDiagnostics table and it's schema (this will the table which you'll use to query data)
  • AzureDiagnosticsExpand() function to read the raw table
  • Policy that we use AzureDiagnosticsExpand() function to write the data into AzureDiagnostics table in proper format

Copy-paste these commands to you "Query" window, then select each command one-by-one and "Run". So, you'll need to "run" 6 commands in total.

.create table AzureDiagnosticsRaw (Records:dynamic)

.create table AzureDiagnosticsRaw ingestion json mapping 'AzureDiagnosticsRawMapping' '[{"column":"Records","Properties":{"path":"$.records"}}]'

.alter-merge table AzureDiagnosticsRaw policy retention softdelete = 0d

.create table AzureDiagnostics (
    PartitionId: string,
    originalEventTimestamp: datetime,
    timeGenerated: datetime,
    resourceId: string,
    category: string,
    operationName: string,
    server_instance_name_s: string,
    obo_middle_tier_app_id_s: string,
    class_type_description_s: string,
    database_transaction_id_d: int,
    target_server_principal_name_s: string,
    is_server_level_audit_s: string,
    object_id_d: int,
    class_type_s: string,
    server_principal_name_s: string,
    server_principal_id_d: int,
    action_id_s: string,
    user_defined_event_id_d: int,
    is_local_secondary_replica_s: string,
    client_ip_s: string,
    duration_milliseconds_d: int,
    sequence_group_id_g: guid,
    session_context_s: string,
    event_id_g: guid,
    event_time_t: datetime,
    session_server_principal_name_s: string,
    is_column_permission_s: string,
    object_name_s: string,
    client_tls_version_d: int,
    target_server_principal_id_d: int,
    target_database_principal_name_s: string,
    database_principal_id_d: int,
    server_principal_sid_s: string,
    application_name_s: string,
    database_principal_name_s: string,
    session_id_d: int,
    succeeded_s: string,
    statement_s: string,
    permission_bitmask_s: string,
    response_rows_d: int,
    database_name_s: string,
    target_database_principal_id_d: int,
    connection_id_g: guid,
    additional_information_s: string,
    host_name_s: string,
    sequence_number_d: int,
    external_policy_permissions_checked_s: string,
    ledger_start_sequence_number_d: int,
    target_server_principal_sid_s: string,
    affected_rows_d: int,
    data_sensitivity_information_s: string,
    client_tls_version_name_s: string,
    audit_schema_version_d: int,
    action_name_s: string,
    transaction_id_d: int,
    user_defined_information_s: string,
    schema_name_s: string,
    securable_class_type_s: string,
    location_s: string,
    LogicalServerName_s: string,
    SubscriptionId_g: guid,
    ResourceGroup_s: string
)

.create-or-alter function AzureDiagnosticsExpand() {
    AzureDiagnosticsRaw
    | mv-expand events = Records
    | project 
        PartitionId = tostring(events.PartitionId),
        originalEventTimestamp = todatetime(events.originalEventTimestamp),
        timeGenerated = todatetime(events.['time']),
        resourceId = tostring(events.resourceId),
        category = tostring(events.category),
        operationName = tostring(events.operationName),
        server_instance_name_s = tostring(events.properties.server_instance_name),
        obo_middle_tier_app_id_s = tostring(events.properties.obo_middle_tier_app_id),
        class_type_description_s = tostring(events.properties.class_type_description),
        database_transaction_id_d = toint(events.properties.database_transaction_id),
        target_server_principal_name_s = tostring(events.properties.target_server_principal_name),
        is_server_level_audit_s = tostring(events.properties.is_server_level_audit),
        object_id_d = toint(events.properties.object_id),
        class_type_s = tostring(events.properties.class_type),
        server_principal_name_s = tostring(events.properties.server_principal_name),
        server_principal_id_d = toint(events.properties.server_principal_id),
        action_id_s = tostring(events.properties.action_id),
        user_defined_event_id_d = toint(events.properties.user_defined_event_id),
        is_local_secondary_replica_s = tostring(events.properties.is_local_secondary_replica),
        client_ip_s = tostring(events.properties.client_ip),
        duration_milliseconds_d = toint(events.properties.duration_milliseconds),
        sequence_group_id_g = toguid(events.properties.sequence_group_id),
        session_context_s = tostring(events.properties.session_context),
        event_id_g = toguid(events.properties.event_id),
        event_time_t = todatetime(events.properties.event_time),
        session_server_principal_name_s = tostring(events.properties.session_server_principal_name),
        is_column_permission_s = tostring(events.properties.is_column_permission),
        object_name_s = tostring(events.properties.object_name),
        client_tls_version_d = toint(events.properties.client_tls_version),
        target_server_principal_id_d = toint(events.properties.target_server_principal_id),
        target_database_principal_name_s = tostring(events.properties.target_database_principal_name),
        database_principal_id_d = toint(events.properties.database_principal_id),
        server_principal_sid_s = tostring(events.properties.server_principal_sid),
        application_name_s = tostring(events.properties.application_name),
        database_principal_name_s = tostring(events.properties.database_principal_name),
        session_id_d = toint(events.properties.session_id),
        succeeded_s = tostring(events.properties.succeeded),
        statement_s = tostring(events.properties.statement),
        permission_bitmask_s = tostring(events.properties.permission_bitmask),
        response_rows_d = toint(events.properties.response_rows),
        database_name_s = tostring(events.properties.database_name),
        target_database_principal_id_d = toint(events.properties.target_database_principal_id),
        connection_id_g = toguid(events.properties.connection_id),
        additional_information_s = tostring(events.properties.additional_information),
        host_name_s = tostring(events.properties.host_name),
        sequence_number_d = toint(events.properties.sequence_number),
        external_policy_permissions_checked_s = tostring(events.properties.external_policy_permissions_checked),
        ledger_start_sequence_number_d = toint(events.properties.ledger_start_sequence_number),
        target_server_principal_sid_s = tostring(events.properties.target_server_principal_sid),
        affected_rows_d = toint(events.properties.affected_rows),
        data_sensitivity_information_s = tostring(events.properties.data_sensitivity_information),
        client_tls_version_name_s = tostring(events.properties.client_tls_version_name),
        audit_schema_version_d = toint(events.properties.audit_schema_version),
        action_name_s = tostring(events.properties.action_name),
        transaction_id_d = toint(events.properties.transaction_id),
        user_defined_information_s = tostring(events.properties.user_defined_information),
        schema_name_s = tostring(events.properties.schema_name),
        securable_class_type_s = tostring(events.properties.securable_class_type),
        location_s = tostring(events.location),
        LogicalServerName_s = tostring(events.LogicalServerName),
        SubscriptionId_g = toguid(events.SubscriptionId),
        ResourceGroup_s = tostring(events.ResourceGroup)
}

.alter table AzureDiagnostics policy update @'[{"Source": "AzureDiagnosticsRaw", "Query": "AzureDiagnosticsExpand()", "IsEnabled": "True", "IsTransactional": true}]'        

Now the tables are ready, we just need the data and let's choose "Data connections" from left menu, then "Add data connection" and "Event Hub".

The data connection config will look like this:

Article content

It'll take some time for data to start appearing in the tables, but it'll look like this:

Article content

Of course you can query ADX from Sentinel as well, like this:

(vet-adx is a cluster name, northeurope region, database name AzureDiagnostics and table name is also AzureDiagnostics)

adx("vet-adx.northeurope/AzureDiagnostics").AzureDiagnostics        

You can also save above query as a Function, and name it as "SQLAudit". Then you can just query SQLAudit, like this:

Article content


Enjoy! I didn't calculate how much you'll save money over Log Analytics (or Sentinel), but it'll be a lot of 💰.


Navid Khan

Technical Architect at ValueLabs | Azure Platform | FinOps Practitioner | Azure Kubernetes | Azure AI | GitHub | Azure Data Factory | PowerShell | Power BI

4mo

Yes, ADX approach seems to be cost effective. Worth investigating further and changing the setup from storage account to ADX. Thank you for sharing 👍

To view or add a comment, sign in

More articles by Marko Lauren

Others also viewed

Explore content categories