Upgrade to Microsoft Edge to take advantage of the latest features, security updates, and technical support.
Download Microsoft Edge
More info about Internet Explorer and Microsoft Edge
This article shows you how to ingest JSON formatted data into an Azure Data Explorer database. You'll start with simple examples of raw and mapped JSON, continue to multi-lined JSON, and then tackle more complex JSON schemas containing arrays and dictionaries. The examples detail the process of ingesting JSON formatted data using Kusto Query Language (KQL), C#, or Python. The Kusto Query Language
ingest
management commands are executed directly to the engine endpoint. In production scenarios, ingestion is executed to the Data Management service using client libraries or data connections. Read
Ingest data using the Azure Data Explorer Python library
and
Ingest data using the Azure Data Explorer .NET Standard SDK
for a walk-through regarding ingesting data with these client libraries.
If you're using the JSON Line format, where each line is a single well-formatted JSON record, and you want to be able to handle records that are not well-formed, you can select the option to "Ignore data format errors." This will allow the valid records to be ingested while skipping the ones that are not well-formed.
The following example is a simple JSON, with a flat structure. The data has temperature and humidity information, collected by several devices. Each record is marked with an ID and timestamp.
"timestamp": "2019-05-02 15:23:50.0369439",
"deviceId": "2945c8aa-f13e-4c48-4473-b81440bb5ca2",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
In this example, you ingest JSON records as raw data to a single column table. The data manipulation, using queries, and update policy is done after the data is ingested.
Paste in the following command, and select
Run
to create the table.
.create table RawEvents (Event: dynamic)
This query creates a table with a single Event
column of a dynamic data type.
Create the JSON mapping.
.create table RawEvents ingestion json mapping 'RawEventMapping' '[{"column":"Event","Properties":{"path":"$"}}]'
This command creates a mapping, and maps the JSON root path $
to the Event
column.
Ingest data into the RawEvents
table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"RawEventMapping"}'
Create the RawEvents
table.
var kustoUri = "https://<clusterName>.<region>.kusto.windows.net/";
var connectionStringBuilder = new KustoConnectionStringBuilder(kustoUri)
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
using var kustoClient = KustoClientFactory.CreateCslAdminProvider(connectionStringBuilder);
var tableName = "RawEvents";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[] { Tuple.Create("Events", "System.Object") }
await kustoClient.ExecuteControlCommandAsync(command);
Create the JSON mapping.
var tableMappingName = "RawEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
new() { ColumnName = "Events", Properties = new Dictionary<string, string> { { "path", "$" } } }
await kustoClient.ExecuteControlCommandAsync(command);
This command creates a mapping, and maps the JSON root path $
to the Event
column.
Ingest data into the RawEvents
table.
var ingestUri = "https://ingest-<clusterName>.<region>.kusto.windows.net/";
var ingestConnectionStringBuilder = new KustoConnectionStringBuilder(ingestUri)
FederatedSecurity = true,
UserID = userId,
Password = password,
Authority = tenantId,
InitialCatalog = databaseName
using var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(ingestConnectionStringBuilder);
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Create the RawEvents
table.
KUSTO_URI = "https://<ClusterName>.<Region>.kusto.windows.net/"
KCSB_DATA = KustoConnectionStringBuilder.with_aad_device_authentication(KUSTO_URI, AAD_TENANT_ID)
KUSTO_CLIENT = KustoClient(KCSB_DATA)
TABLE = "RawEvents"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Events: dynamic)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Create the JSON mapping.
MAPPING = "RawEventMapping"
CREATE_MAPPING_COMMAND = ".create table " + TABLE + " ingestion json mapping '" + MAPPING + """' '[{"column":"Event","path":"$"}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the RawEvents
table.
INGEST_URI = "https://ingest-<ClusterName>.<Region>.kusto.windows.net/"
KCSB_INGEST = KustoConnectionStringBuilder.with_aad_device_authentication(INGEST_URI, AAD_TENANT_ID)
INGESTION_CLIENT = KustoIngestClient(KCSB_INGEST)
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Data is aggregated according to batching policy, resulting in a latency of a few minutes.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
.create table Events (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)
Create the JSON mapping.
.create table Events ingestion json mapping 'FlatEventMapping' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'
In this mapping, as defined by the table schema, the timestamp
entries will be ingested to the column Time
as datetime
data types.
Ingest data into the Events
table.
.ingest into table Events ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json') with '{"format":"json", "ingestionMappingReference":"FlatEventMapping"}'
The file 'simple.json' has a few line-separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
var tableName = "Events";
var command = CslCommandGenerator.GenerateTableCreateCommand(
tableName,
new[]
Tuple.Create("Time", "System.DateTime"),
Tuple.Create("Device", "System.String"),
Tuple.Create("MessageId", "System.String"),
Tuple.Create("Temperature", "System.Double"),
Tuple.Create("Humidity", "System.Double")
await kustoClient.ExecuteControlCommandAsync(command);
Create the JSON mapping.
var tableMappingName = "FlatEventMapping";
command = CslCommandGenerator.GenerateTableMappingCreateCommand(
IngestionMappingKind.Json,
tableName,
tableMappingName,
new ColumnMapping[]
new() { ColumnName = "Time", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.timestamp" } } },
new() { ColumnName = "Device", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.deviceId" } } },
new() { ColumnName = "MessageId", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.messageId" } } },
new() { ColumnName = "Temperature", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.temperature" } } },
new() { ColumnName = "Humidity", Properties = new Dictionary<string, string> { { MappingConsts.Path, "$.humidity" } } }
await kustoClient.ExecuteControlCommandAsync(command);
In this mapping, as defined by the table schema, the timestamp
entries will be ingested to the column Time
as datetime
data types.
Ingest data into the Events
table.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
Format = DataSourceFormat.json,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
The file 'simple.json' has a few line-separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Create a new table, with a similar schema to the JSON input data. We'll use this table for all the following examples and ingest commands.
TABLE = "Events"
CREATE_TABLE_COMMAND = ".create table " + TABLE + " (Time: datetime, Device: string, MessageId: string, Temperature: double, Humidity: double)"
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_TABLE_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Create the JSON mapping.
MAPPING = "FlatEventMapping"
CREATE_MAPPING_COMMAND = ".create table Events ingestion json mapping '" + MAPPING + """' '[{"column":"Time","Properties":{"path":"$.timestamp"}},{"column":"Device","Properties":{"path":"$.deviceId"}},{"column":"MessageId","Properties":{"path":"$.messageId"}},{"column":"Temperature","Properties":{"path":"$.temperature"}},{"column":"Humidity","Properties":{"path":"$.humidity"}}]'"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_MAPPING_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the Events
table.
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/simple.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.JSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
The file 'simple.json' has a few line separated JSON records. The format is json
, and the mapping used in the ingest command is the FlatEventMapping
you created.
Ingest data into the Events
table.
var tableMappingName = "FlatEventMapping";
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
await ingestClient.IngestFromStorageAsync(blobPath, properties).ConfigureAwait(false);
Ingest data into the Events
table.
MAPPING = "FlatEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/multilined.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Ingest JSON records containing arrays
Array data types are an ordered collection of values. Ingestion of a JSON array is done by an update policy. The JSON is ingested as-is to an intermediate table. An update policy runs a pre-defined function on the RawEvents
table, reingesting the results to the target table. We'll ingest data with the following structure:
"records":
"timestamp": "2019-05-02 15:23:50.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "7f316225-839a-4593-92b5-1812949279b3",
"temperature": 31.0301639051317,
"humidity": 62.0791099602725
"timestamp": "2019-05-02 15:23:51.0000000",
"deviceId": "ddbc1bf5-096f-42c0-a771-bc3dca77ac71",
"messageId": "57de2821-7581-40e4-861e-ea3bde102364",
"temperature": 33.7529423105311,
"humidity": 75.4787976739364
Create an update policy
function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event.records
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
The schema received by the function must match the schema of the target table. Use getschema
operator to review the schema.
EventRecordsExpand() | getschema
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest the results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
.alter table Events policy update @'[{"Source": "RawEvents", "Query": "EventRecordsExpand()", "IsEnabled": "True"}]'
Ingest data into the RawEvents
table.
.ingest into table RawEvents ('https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json') with '{"format":"multijson", "ingestionMappingReference":"RawEventMapping"}'
Review data in the Events
table.
Events
Create an update function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
var command = CslCommandGenerator.GenerateCreateFunctionCommand(
"EventRecordsExpand",
"UpdateFunctions",
string.Empty,
null,
@"RawEvents
| mv-expand records = Event
| project
Time = todatetime(records['timestamp']),
Device = tostring(records['deviceId']),
MessageId = tostring(records['messageId']),
Temperature = todouble(records['temperature']),
Humidity = todouble(records['humidity'])",
ifNotExists: false
await kustoClient.ExecuteControlCommandAsync(command);
The schema received by the function must match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest its results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
command = ".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]";
await kustoClient.ExecuteControlCommandAsync(command);
Ingest data into the RawEvents
table.
var blobPath = "https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json";
var tableName = "RawEvents";
var tableMappingName = "RawEventMapping";
var properties = new KustoQueuedIngestionProperties(databaseName, tableName)
Format = DataSourceFormat.multijson,
IngestionMapping = new IngestionMapping { IngestionMappingReference = tableMappingName }
await ingestClient.IngestFromStorageAsync(blobPath, properties);
Review data in the Events
table.
Create an update function that expands the collection of records
so that each value in the collection receives a separate row, using the mv-expand
operator. We'll use table RawEvents
as a source table and Events
as a target table.
CREATE_FUNCTION_COMMAND =
'''.create function EventRecordsExpand() {
RawEvents
| mv-expand records = Event
| project
Time = todatetime(records["timestamp"]),
Device = tostring(records["deviceId"]),
MessageId = tostring(records["messageId"]),
Temperature = todouble(records["temperature"]),
Humidity = todouble(records["humidity"])
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_FUNCTION_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
The schema received by the function has to match the schema of the target table.
Add the update policy to the target table. This policy will automatically run the query on any newly ingested data in the RawEvents
intermediate table and ingest its results into the Events
table. Define a zero-retention policy to avoid persisting the intermediate table.
CREATE_UPDATE_POLICY_COMMAND =
""".alter table Events policy update @'[{'Source': 'RawEvents', 'Query': 'EventRecordsExpand()', 'IsEnabled': 'True'}]"""
RESPONSE = KUSTO_CLIENT.execute_mgmt(DATABASE, CREATE_UPDATE_POLICY_COMMAND)
dataframe_from_result_table(RESPONSE.primary_results[0])
Ingest data into the RawEvents
table.
TABLE = "RawEvents"
MAPPING = "RawEventMapping"
BLOB_PATH = 'https://kustosamplefiles.blob.core.windows.net/jsonsamplefiles/array.json'
INGESTION_PROPERTIES = IngestionProperties(database=DATABASE, table=TABLE, dataFormat=DataFormat.MULTIJSON, ingestion_mapping_reference=MAPPING)
BLOB_DESCRIPTOR = BlobDescriptor(BLOB_PATH, FILE_SIZE)
INGESTION_CLIENT.ingest_from_blob(
BLOB_DESCRIPTOR, ingestion_properties=INGESTION_PROPERTIES)
Review data in the Events
table.