sqlstore.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. package sqlstore
  2. import (
  3. "context"
  4. "fmt"
  5. "net/url"
  6. "os"
  7. "path"
  8. "path/filepath"
  9. "strings"
  10. "testing"
  11. "time"
  12. "github.com/grafana/grafana/pkg/bus"
  13. "github.com/grafana/grafana/pkg/log"
  14. m "github.com/grafana/grafana/pkg/models"
  15. "github.com/grafana/grafana/pkg/registry"
  16. "github.com/grafana/grafana/pkg/services/annotations"
  17. "github.com/grafana/grafana/pkg/services/cache"
  18. "github.com/grafana/grafana/pkg/services/sqlstore/migrations"
  19. "github.com/grafana/grafana/pkg/services/sqlstore/migrator"
  20. "github.com/grafana/grafana/pkg/services/sqlstore/sqlutil"
  21. "github.com/grafana/grafana/pkg/setting"
  22. "github.com/grafana/grafana/pkg/util"
  23. "github.com/go-sql-driver/mysql"
  24. "github.com/go-xorm/xorm"
  25. _ "github.com/grafana/grafana/pkg/tsdb/mssql"
  26. _ "github.com/lib/pq"
  27. sqlite3 "github.com/mattn/go-sqlite3"
  28. )
  29. var (
  30. x *xorm.Engine
  31. dialect migrator.Dialect
  32. sqlog log.Logger = log.New("sqlstore")
  33. )
  34. const ContextSessionName = "db-session"
  35. func init() {
  36. registry.Register(&registry.Descriptor{
  37. Name: "SqlStore",
  38. Instance: &SqlStore{},
  39. InitPriority: registry.High,
  40. })
  41. }
  42. type SqlStore struct {
  43. Cfg *setting.Cfg `inject:""`
  44. Bus bus.Bus `inject:""`
  45. CacheService *cache.CacheService `inject:""`
  46. dbCfg DatabaseConfig
  47. engine *xorm.Engine
  48. log log.Logger
  49. Dialect migrator.Dialect
  50. skipEnsureAdmin bool
  51. }
  52. // NewSession returns a new DBSession
  53. func (ss *SqlStore) NewSession() *DBSession {
  54. return &DBSession{Session: ss.engine.NewSession()}
  55. }
  56. // WithDbSession calls the callback with an session attached to the context.
  57. func (ss *SqlStore) WithDbSession(ctx context.Context, callback dbTransactionFunc) error {
  58. sess, err := startSession(ctx, ss.engine, false)
  59. if err != nil {
  60. return err
  61. }
  62. return callback(sess)
  63. }
  64. // WithTransactionalDbSession calls the callback with an session within a transaction
  65. func (ss *SqlStore) WithTransactionalDbSession(ctx context.Context, callback dbTransactionFunc) error {
  66. return ss.inTransactionWithRetryCtx(ctx, callback, 0)
  67. }
  68. func (ss *SqlStore) inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, retry int) error {
  69. sess, err := startSession(ctx, ss.engine, true)
  70. if err != nil {
  71. return err
  72. }
  73. defer sess.Close()
  74. err = callback(sess)
  75. // special handling of database locked errors for sqlite, then we can retry 3 times
  76. if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 {
  77. if sqlError.Code == sqlite3.ErrLocked {
  78. sess.Rollback()
  79. time.Sleep(time.Millisecond * time.Duration(10))
  80. sqlog.Info("Database table locked, sleeping then retrying", "retry", retry)
  81. return ss.inTransactionWithRetryCtx(ctx, callback, retry+1)
  82. }
  83. }
  84. if err != nil {
  85. sess.Rollback()
  86. return err
  87. } else if err = sess.Commit(); err != nil {
  88. return err
  89. }
  90. if len(sess.events) > 0 {
  91. for _, e := range sess.events {
  92. if err = bus.Publish(e); err != nil {
  93. log.Error(3, "Failed to publish event after commit. error: %v", err)
  94. }
  95. }
  96. }
  97. return nil
  98. }
  99. func (ss *SqlStore) Init() error {
  100. ss.log = log.New("sqlstore")
  101. ss.readConfig()
  102. engine, err := ss.getEngine()
  103. if err != nil {
  104. return fmt.Errorf("Fail to connect to database: %v", err)
  105. }
  106. ss.engine = engine
  107. ss.Dialect = migrator.NewDialect(ss.engine)
  108. // temporarily still set global var
  109. x = engine
  110. dialect = ss.Dialect
  111. migrator := migrator.NewMigrator(x)
  112. migrations.AddMigrations(migrator)
  113. for _, descriptor := range registry.GetServices() {
  114. sc, ok := descriptor.Instance.(registry.DatabaseMigrator)
  115. if ok {
  116. sc.AddMigration(migrator)
  117. }
  118. }
  119. if err := migrator.Start(); err != nil {
  120. return fmt.Errorf("Migration failed err: %v", err)
  121. }
  122. // Init repo instances
  123. annotations.SetRepository(&SqlAnnotationRepo{})
  124. ss.Bus.SetTransactionManager(ss)
  125. // Register handlers
  126. ss.addUserQueryAndCommandHandlers()
  127. // ensure admin user
  128. if ss.skipEnsureAdmin {
  129. return nil
  130. }
  131. return ss.ensureAdminUser()
  132. }
  133. func (ss *SqlStore) ensureAdminUser() error {
  134. systemUserCountQuery := m.GetSystemUserCountStatsQuery{}
  135. err := ss.InTransaction(context.Background(), func(ctx context.Context) error {
  136. err := bus.DispatchCtx(ctx, &systemUserCountQuery)
  137. if err != nil {
  138. return fmt.Errorf("Could not determine if admin user exists: %v", err)
  139. }
  140. if systemUserCountQuery.Result.Count > 0 {
  141. return nil
  142. }
  143. cmd := m.CreateUserCommand{}
  144. cmd.Login = setting.AdminUser
  145. cmd.Email = setting.AdminUser + "@localhost"
  146. cmd.Password = setting.AdminPassword
  147. cmd.IsAdmin = true
  148. if err := bus.DispatchCtx(ctx, &cmd); err != nil {
  149. return fmt.Errorf("Failed to create admin user: %v", err)
  150. }
  151. ss.log.Info("Created default admin", "user", setting.AdminUser)
  152. return nil
  153. })
  154. return err
  155. }
  156. func (ss *SqlStore) buildConnectionString() (string, error) {
  157. cnnstr := ss.dbCfg.ConnectionString
  158. // special case used by integration tests
  159. if cnnstr != "" {
  160. return cnnstr, nil
  161. }
  162. switch ss.dbCfg.Type {
  163. case migrator.MYSQL:
  164. protocol := "tcp"
  165. if strings.HasPrefix(ss.dbCfg.Host, "/") {
  166. protocol = "unix"
  167. }
  168. cnnstr = fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true",
  169. ss.dbCfg.User, ss.dbCfg.Pwd, protocol, ss.dbCfg.Host, ss.dbCfg.Name)
  170. if ss.dbCfg.SslMode == "true" || ss.dbCfg.SslMode == "skip-verify" {
  171. tlsCert, err := makeCert("custom", ss.dbCfg)
  172. if err != nil {
  173. return "", err
  174. }
  175. mysql.RegisterTLSConfig("custom", tlsCert)
  176. cnnstr += "&tls=custom"
  177. }
  178. case migrator.POSTGRES:
  179. host, port, err := util.SplitIpPort(ss.dbCfg.Host, "5432")
  180. if err != nil {
  181. return "", err
  182. }
  183. if ss.dbCfg.Pwd == "" {
  184. ss.dbCfg.Pwd = "''"
  185. }
  186. if ss.dbCfg.User == "" {
  187. ss.dbCfg.User = "''"
  188. }
  189. cnnstr = fmt.Sprintf("user=%s password=%s host=%s port=%s dbname=%s sslmode=%s sslcert=%s sslkey=%s sslrootcert=%s", ss.dbCfg.User, ss.dbCfg.Pwd, host, port, ss.dbCfg.Name, ss.dbCfg.SslMode, ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath)
  190. case migrator.SQLITE:
  191. // special case for tests
  192. if !filepath.IsAbs(ss.dbCfg.Path) {
  193. ss.dbCfg.Path = filepath.Join(ss.Cfg.DataPath, ss.dbCfg.Path)
  194. }
  195. os.MkdirAll(path.Dir(ss.dbCfg.Path), os.ModePerm)
  196. cnnstr = fmt.Sprintf("file:%s?cache=%s&mode=rwc", ss.dbCfg.Path, ss.dbCfg.CacheMode)
  197. default:
  198. return "", fmt.Errorf("Unknown database type: %s", ss.dbCfg.Type)
  199. }
  200. return cnnstr, nil
  201. }
  202. func (ss *SqlStore) getEngine() (*xorm.Engine, error) {
  203. connectionString, err := ss.buildConnectionString()
  204. if err != nil {
  205. return nil, err
  206. }
  207. sqlog.Info("Connecting to DB", "dbtype", ss.dbCfg.Type)
  208. engine, err := xorm.NewEngine(ss.dbCfg.Type, connectionString)
  209. if err != nil {
  210. return nil, err
  211. }
  212. engine.SetMaxOpenConns(ss.dbCfg.MaxOpenConn)
  213. engine.SetMaxIdleConns(ss.dbCfg.MaxIdleConn)
  214. engine.SetConnMaxLifetime(time.Second * time.Duration(ss.dbCfg.ConnMaxLifetime))
  215. // configure sql logging
  216. debugSql := ss.Cfg.Raw.Section("database").Key("log_queries").MustBool(false)
  217. if !debugSql {
  218. engine.SetLogger(&xorm.DiscardLogger{})
  219. } else {
  220. engine.SetLogger(NewXormLogger(log.LvlInfo, log.New("sqlstore.xorm")))
  221. engine.ShowSQL(true)
  222. engine.ShowExecTime(true)
  223. }
  224. return engine, nil
  225. }
  226. func (ss *SqlStore) readConfig() {
  227. sec := ss.Cfg.Raw.Section("database")
  228. cfgURL := sec.Key("url").String()
  229. if len(cfgURL) != 0 {
  230. dbURL, _ := url.Parse(cfgURL)
  231. ss.dbCfg.Type = dbURL.Scheme
  232. ss.dbCfg.Host = dbURL.Host
  233. pathSplit := strings.Split(dbURL.Path, "/")
  234. if len(pathSplit) > 1 {
  235. ss.dbCfg.Name = pathSplit[1]
  236. }
  237. userInfo := dbURL.User
  238. if userInfo != nil {
  239. ss.dbCfg.User = userInfo.Username()
  240. ss.dbCfg.Pwd, _ = userInfo.Password()
  241. }
  242. } else {
  243. ss.dbCfg.Type = sec.Key("type").String()
  244. ss.dbCfg.Host = sec.Key("host").String()
  245. ss.dbCfg.Name = sec.Key("name").String()
  246. ss.dbCfg.User = sec.Key("user").String()
  247. ss.dbCfg.ConnectionString = sec.Key("connection_string").String()
  248. ss.dbCfg.Pwd = sec.Key("password").String()
  249. }
  250. ss.dbCfg.MaxOpenConn = sec.Key("max_open_conn").MustInt(0)
  251. ss.dbCfg.MaxIdleConn = sec.Key("max_idle_conn").MustInt(2)
  252. ss.dbCfg.ConnMaxLifetime = sec.Key("conn_max_lifetime").MustInt(14400)
  253. ss.dbCfg.SslMode = sec.Key("ssl_mode").String()
  254. ss.dbCfg.CaCertPath = sec.Key("ca_cert_path").String()
  255. ss.dbCfg.ClientKeyPath = sec.Key("client_key_path").String()
  256. ss.dbCfg.ClientCertPath = sec.Key("client_cert_path").String()
  257. ss.dbCfg.ServerCertName = sec.Key("server_cert_name").String()
  258. ss.dbCfg.Path = sec.Key("path").MustString("data/grafana.db")
  259. ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
  260. }
  261. func InitTestDB(t *testing.T) *SqlStore {
  262. t.Helper()
  263. sqlstore := &SqlStore{}
  264. sqlstore.skipEnsureAdmin = true
  265. sqlstore.Bus = bus.New()
  266. sqlstore.CacheService = cache.New(5*time.Minute, 10*time.Minute)
  267. dbType := migrator.SQLITE
  268. // environment variable present for test db?
  269. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  270. dbType = db
  271. }
  272. // set test db config
  273. sqlstore.Cfg = setting.NewCfg()
  274. sec, _ := sqlstore.Cfg.Raw.NewSection("database")
  275. sec.NewKey("type", dbType)
  276. switch dbType {
  277. case "mysql":
  278. sec.NewKey("connection_string", sqlutil.TestDB_Mysql.ConnStr)
  279. case "postgres":
  280. sec.NewKey("connection_string", sqlutil.TestDB_Postgres.ConnStr)
  281. default:
  282. sec.NewKey("connection_string", sqlutil.TestDB_Sqlite3.ConnStr)
  283. }
  284. // need to get engine to clean db before we init
  285. engine, err := xorm.NewEngine(dbType, sec.Key("connection_string").String())
  286. if err != nil {
  287. t.Fatalf("Failed to init test database: %v", err)
  288. }
  289. sqlstore.Dialect = migrator.NewDialect(engine)
  290. // temp global var until we get rid of global vars
  291. dialect = sqlstore.Dialect
  292. if err := dialect.CleanDB(); err != nil {
  293. t.Fatalf("Failed to clean test db %v", err)
  294. }
  295. if err := sqlstore.Init(); err != nil {
  296. t.Fatalf("Failed to init test database: %v", err)
  297. }
  298. sqlstore.engine.DatabaseTZ = time.UTC
  299. sqlstore.engine.TZLocation = time.UTC
  300. return sqlstore
  301. }
  302. func IsTestDbMySql() bool {
  303. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  304. return db == migrator.MYSQL
  305. }
  306. return false
  307. }
  308. func IsTestDbPostgres() bool {
  309. if db, present := os.LookupEnv("GRAFANA_TEST_DB"); present {
  310. return db == migrator.POSTGRES
  311. }
  312. return false
  313. }
  314. type DatabaseConfig struct {
  315. Type string
  316. Host string
  317. Name string
  318. User string
  319. Pwd string
  320. Path string
  321. SslMode string
  322. CaCertPath string
  323. ClientKeyPath string
  324. ClientCertPath string
  325. ServerCertName string
  326. ConnectionString string
  327. MaxOpenConn int
  328. MaxIdleConn int
  329. ConnMaxLifetime int
  330. CacheMode string
  331. }