An UPSERT adds a record into a table (via INSERT) unless the record already exists... in which case it does an UPDATE. There are lots of web pages which described how to UPSERT between two SQL Server tables. In this article, we'll look at how to perform the operation into a MySQL database.
Most SQL Server examples use the Lookup transformation, like this:
Transform with OLEDB
But when you work with MySQL there is a limitation -- you cannot use OLEDB connections. You can use ODBC or ADO.NET Connectors from .NET to MySQL Database, as described briefly here:
Connecting to MySQL from SSIS
So... How you can UPSERT with MySQL tables? You will face a problem with Lookup transform because you have no OLEDB Connection to MySQL and you can not use an OLEDB Command on the other side.
The MySQL UPSERT (Update/Insert) Solution
In this solution, the Cache Connection Manager is used, and it is only available in SSIS 2008
Assume that the structure of the SQL Server table is :
CREATE TABLE [dbo].[SourceSqlServerTable](
[ID] [int] NOT NULL,
[FirstName] [varchar](255) NULL,
[LastName] [varchar](255) NULL)
and the structure of the MySQL table is:
CREATE TABLE mysqltable (
ID int(11) DEFAULT NULL,
FirstName varchar(255) DEFAULT NULL,
LastName varchar(255) DEFAULT NULL
The data in SQL Server Table is:
The data in MySQL Table is:
There are three steps as follows:
1. Fill a cache file from the MySQL data
Add a data flow task to control flow, name it as "Fill Cache Component"
Add an ADO.NET Source which points to the MySQL Table.
For finding how to create connection from SSIS to MySQL look at this article
Then add a Cache Transform and connect the data path (green arrow) from ado.net source to this cache transform.
Double click on Connection Manager
Click on New
to create a new cache connection.
In the Cache Connection Manager Editor, check the "Use file cache
And browse to set a path location to put the cache file.
In the Columns
tab, set the Primary Key of the MySQL table as index (set index position as 1), then click OK.
Map the columns to match between the tables.
That completes the first step of setting up to fill the cache file with data from MySQL Table (which is the lookup table).
2. INSERT new rows in MySQL table and fill an object datatype variable.
Go back to Control flow.
Create a new Variable of OBJECT data type in package scope and name it as UpdatedRows
Add another data flow task; name this one as "Lookup
Connect precedence constraint (green arrow) from the first data flow task to the second data flow task.
Then double click on second data flow task.
Add an OLEDB Source (setup a new OLEDB connection to the SQL Server database) and point it to SQL Server table.
Then add a Lookup transform: Connect a green arrow from the OLEDB source to Lookup.
Then double click on Lookup, and in the Lookup transformation editor...
Set Connection Type as "Cache Connection manager
And set "specify how to handle rows with no match entries" with "redirect no match rows to no match output
Then in the Connection
tab, select Cache Connection Manager from the drop-down list.
And in the Columns tab, map the joining fields
Add an ADO.NET Destination pointing to the MySQL database, and map the columns.
This will do the INSERT part of UPSERT.
Add a RecordSet Destination, and double click on it.
In the Component Properties tab, set VariableName with User::UpdatedRows.
That completes the second step, in which you INSERT new rows in MySQL table, and fill the UpdatedRows object datatype variable.
This is whole schema of this second Data flow task:
3. Setup a Foreach Loop to transfer the data
Go back to control flow,
Create these Variables:
Name Scope DataType Value
ID Package Int32 0
FirstName Package String
LastName Package String
Add a Foreach Loop Container, and connect precedence constraint from "Lookup" data flow to this container.
In the Foreach Loop Editor, Set Enumerator as ADO Enumerator.
And set ADO object source variable with User::UpdatedRows.
Then, in the Variable Mappings tab, do these mappings:
Then add an Execute SQL task inside the foreach loop container.
And set the connection to MySQL database there.
Write the update command in SQL Statement property,as below:
Then in the Parameter Mappings
tab, do these mappings:
That's all! Here is the whole schema of the package:
Now run the package, and the result in the MySQL table after the "upsert" will be:
This article is a reprint of my blog post here:
and posted on EE with full permission.