添加链接
link之家
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

This browser is no longer supported.

Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.

Download Microsoft Edge More info about Internet Explorer and Microsoft Edge

This article shows you how to ingest JSON formatted data into an Azure Data Explorer database. You'll start with simple examples of raw and mapped JSON, continue to multi-lined JSON, and then tackle more complex JSON schemas containing arrays and dictionaries. The examples detail the process of ingesting JSON formatted data using Kusto Query Language (KQL), C#, or Python. The Kusto Query Language ingest management commands are executed directly to the engine endpoint. In production scenarios, ingestion is executed to the Data Management service using client libraries or data connections. Read Ingest data using the Azure Data Explorer Python library and Ingest data using the Azure Data Explorer .NET Standard SDK for a walk-through regarding ingesting data with these client libraries.

Prerequisites

  • A Microsoft account or an Azure Active Directory user identity. An Azure subscription isn't required.
  • An Azure Data Explorer cluster and database. Create a cluster and database .
  • The JSON format

    Azure Data Explorer supports two JSON file formats:

  • json : Line separated JSON. Each line in the input data has exactly one JSON record. This format supports parsing of comments and single-quoted properties. For more information, see JSON Lines .
  • multijson : Multi-lined JSON. The parser ignores the line separators and reads a record from the previous position to the end of a valid JSON. This format supports parsing of comments, single-quoted properties, and newlines.
  • When ingesting using the ingestion wizard , the default format is multijson . The format can handle multiline JSON records and arrays of JSON records. When a parsing error is encountered, the entire file is discarded.

    If you're using the JSON Line format, where each line is a single well-formatted JSON record, and you want to be able to handle records that are not well-formed, you can select the option to "Ignore data format errors." This will allow the valid records to be ingested while skipping the ones that are not well-formed.

    Ingest and map JSON formatted data

    Ingestion of JSON formatted data requires you to specify the format using ingestion property . Ingestion of JSON data requires mapping , which maps a JSON source entry to its target column. When ingesting data, use the IngestionMapping property with its ingestionMappingReference (for a pre-defined mapping) ingestion property or its IngestionMappings property. This article will use the ingestionMappingReference ingestion property, which is pre-defined on the table used for ingestion. In the examples below, we'll start by ingesting JSON records as raw data to a single column table. Then we'll use the mapping to ingest each property to its mapped column.

    Simple JSON example

    The following example is a simple JSON, with a flat structure. The data has temperature and humidity information, collected by several devices. Each record is marked with an ID and timestamp.

    "timestamp": "2019-05-02 15:23:50.0369439", "deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2", "messageId": "7f316225-839a-4593-92b5-1812949279b3", "temperature": 31.0301639051317, "humidity": 62.0791099602725

    Ingest raw JSON records

    In this example, you ingest JSON records as raw data to a single column table. The data manipulation, using queries, and update policy is done after the data is ingested.

    Python

    Use Kusto Query Language to ingest data in a raw JSON format .

  • Sign in to https://dataexplorer.azure.com .

  • Select Add cluster .

  • In the Add cluster dialog box, enter your cluster URL in the form https://<ClusterName>.<Region>.kusto.windows.net/ , then select Add .

  • Paste in the following command, and select Run to create the table.

    .create table RawEvents (Event: dynamic)
    

    This query creates a table with a single Event column of a dynamic data type.

  • Create the JSON mapping.

    .create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
    

    This command creates a mapping, and maps the JSON root path $ to the Event column.

  • Ingest data into the RawEvents table.

    .ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
    
  • Create the RawEvents table.

    var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
    var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
        FederatedSecurity = true,
        UserID = userId,
        Password = password,
        Authority = tenantId,
        InitialCatalog = databaseName
    using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
    var tableName = "RawEvents";
    var command = CslCommandGenerator.GenerateTableCreateCommand(
        tableName,
        new[] { Tuple.Create("Events", "System.Object") }
    await kustoClient.ExecuteControlCommandAsync(command);
    
  • Create the JSON mapping.

    var tableMappingName = "RawEventMapping";
    command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Json,
        tableName,
        tableMappingName,
        new ColumnMapping[]
            new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
    await kustoClient.ExecuteControlCommandAsync(command);
    

    This command creates a mapping, and maps the JSON root path $ to the Event column.

  • Ingest data into the RawEvents table.

    var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
    var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
        FederatedSecurity = true,
        UserID = userId,
        Password = password,
        Authority = tenantId,
        InitialCatalog = databaseName
    using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
    var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
    var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
        Format = DataSourceFormat.json,
        IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
    await ingestClient.IngestFromStorageAsync(blobPath, properties);
    
  • Create the RawEvents table.

    KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
    KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
    KUSTO_CLIENT = KustoClient(KCSB_DATA)
    TABLE = "RawEvents"
    CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    
  • Create the JSON mapping.

    MAPPING = "RawEventMapping"
    CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    
  • Ingest data into the RawEvents table.

    INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
    KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
    INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
    BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
    INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
    BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
    INGESTION_CLIENT.ingest_from_blob(
        BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
    

    Data is aggregated according to batching policy, resulting in a latency of a few minutes.

  • Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.

    .create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
    
  • Create the JSON mapping.

    .create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
    

    In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.

  • Ingest data into the Events table.

    .ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
    

    The file 'simple.json' has a few line-separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.

  • Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.

    var tableName = "Events";
    var command = CslCommandGenerator.GenerateTableCreateCommand(
       tableName,
       new[]
           Tuple.Create("Time", "System.DateTime"),
           Tuple.Create("Device", "System.String"),
           Tuple.Create("MessageId", "System.String"),
           Tuple.Create("Temperature", "System.Double"),
           Tuple.Create("Humidity", "System.Double")
    await kustoClient.ExecuteControlCommandAsync(command);
    
  • Create the JSON mapping.

    var tableMappingName = "FlatEventMapping";
    command = CslCommandGenerator.GenerateTableMappingCreateCommand(
        IngestionMappingKind.Json,
        tableName,
        tableMappingName,
        new ColumnMapping[]
            new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
            new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
            new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
            new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
            new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
    await kustoClient.ExecuteControlCommandAsync(command);
    

    In this mapping, as defined by the table schema, the timestamp entries will be ingested to the column Time as datetime data types.

  • Ingest data into the Events table.

    var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
    var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
        Format = DataSourceFormat.json,
        IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
    await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
    

    The file 'simple.json' has a few line-separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.

  • Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.

    TABLE = "Events"
    CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    
  • Create the JSON mapping.

    MAPPING = "FlatEventMapping"
    CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    
  • Ingest data into the Events table.

    BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
    INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
    BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
    INGESTION_CLIENT.ingest_from_blob(
        BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
    

    The file 'simple.json' has a few line separated JSON records. The format is json, and the mapping used in the ingest command is the FlatEventMapping you created.

    Ingest data into the Events table.

    var tableMappingName = "FlatEventMapping";
    var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
    var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
        Format = DataSourceFormat.multijson,
        IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
    await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
    

    Ingest data into the Events table.

    MAPPING = "FlatEventMapping"
    BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
    INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
    BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
    INGESTION_CLIENT.ingest_from_blob(
        BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
    

    Ingest JSON records containing arrays

    Array data types are an ordered collection of values. Ingestion of a JSON array is done by an update policy. The JSON is ingested as-is to an intermediate table. An update policy runs a pre-defined function on the RawEvents table, reingesting the results to the target table. We'll ingest data with the following structure:

    "records": "timestamp": "2019-05-02 15:23:50.0000000", "deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71", "messageId": "7f316225-839a-4593-92b5-1812949279b3", "temperature": 31.0301639051317, "humidity": 62.0791099602725 "timestamp": "2019-05-02 15:23:51.0000000", "deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71", "messageId": "57de2821-7581-40e4-861e-ea3bde102364", "temperature": 33.7529423105311, "humidity": 75.4787976739364
  • Create an update policy function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.

    .create function EventRecordsExpand() {
        RawEvents
        | mv-expand records = Event.records
        | project
            Time = todatetime(records["timestamp"]),
            Device = tostring(records["deviceId"]),
            MessageId = tostring(records["messageId"]),
            Temperature = todouble(records["temperature"]),
            Humidity = todouble(records["humidity"])
    
  • The schema received by the function must match the schema of the target table. Use getschema operator to review the schema.

    EventRecordsExpand() | getschema
    
  • Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest the results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.

    .alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
    
  • Ingest data into the RawEvents table.

    .ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
    
  • Review data in the Events table.

    Events
    
  • Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.

    var command = CslCommandGenerator.GenerateCreateFunctionCommand(
        "EventRecordsExpand",
        "UpdateFunctions",
        string.Empty,
        null,
        @"RawEvents
        | mv-expand records = Event
        | project
            Time = todatetime(records['timestamp']),
            Device = tostring(records['deviceId']),
            MessageId = tostring(records['messageId']),
            Temperature = todouble(records['temperature']),
            Humidity = todouble(records['humidity'])",
        ifNotExists: false
    await kustoClient.ExecuteControlCommandAsync(command);
    

    The schema received by the function must match the schema of the target table.

  • Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.

    command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
    await kustoClient.ExecuteControlCommandAsync(command);
    
  • Ingest data into the RawEvents table.

    var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
    var tableName = "RawEvents";
    var tableMappingName = "RawEventMapping";
    var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
        Format = DataSourceFormat.multijson,
        IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
    await ingestClient.IngestFromStorageAsync(blobPath, properties);
    
  • Review data in the Events table.

  • Create an update function that expands the collection of records so that each value in the collection receives a separate row, using the mv-expand operator. We'll use table RawEvents as a source table and Events as a target table.

    CREATE_FUNCTION_COMMAND =
        '''.create function EventRecordsExpand() {
            RawEvents
            | mv-expand records = Event
            | project
                Time = todatetime(records["timestamp"]),
                Device = tostring(records["deviceId"]),
                MessageId = tostring(records["messageId"]),
                Temperature = todouble(records["temperature"]),
                Humidity = todouble(records["humidity"])
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    

    The schema received by the function has to match the schema of the target table.

  • Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents intermediate table and ingest its results into the Events table. Define a zero-retention policy to avoid persisting the intermediate table.

    CREATE_UPDATE_POLICY_COMMAND =
        """.alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
    RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
    dataframe_from_result_table(RESPONSE.primary_results[0])
    
  • Ingest data into the RawEvents table.

    TABLE = "RawEvents"
    MAPPING = "RawEventMapping"
    BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
    INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
    BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
    INGESTION_CLIENT.ingest_from_blob(
        BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
    
  • Review data in the Events table.

  •